blob: c59903e6c7db1239471e8298610dcbf820a8ce26 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds_test
import (
"fmt"
"sort"
"testing"
"time"
)
import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/protobuf/types/known/structpb"
meshconfig "istio.io/api/mesh/v1alpha1"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pilot/test/xdstest"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/config/protocol"
"github.com/apache/dubbo-go-pixiu/pkg/network"
)
// Testing the Split Horizon EDS.
type expectedResults struct {
weights map[string]uint32
}
func (r expectedResults) getAddrs() []string {
var out []string
for addr := range r.weights {
out = append(out, addr)
}
sort.Strings(out)
return out
}
// The test will setup 3 networks with various number of endpoints for the same service within
// each network. It creates an instance of memory registry for each cluster and populate it
// with Service, Instances and an ingress gateway service.
// It then conducts an EDS query from each network expecting results to match the design of
// the Split Horizon EDS - all local endpoints + endpoint per remote network that also has
// endpoints for the service.
func TestSplitHorizonEds(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{NetworksWatcher: mesh.NewFixedNetworksWatcher(nil)})
// Set up a cluster registry for network 1 with 1 instance for the service 'service5'
// Network has 1 gateway
initRegistry(s, 1, []string{"159.122.219.1"}, 1)
// Set up a cluster registry for network 2 with 2 instances for the service 'service5'
// Network has 1 gateway
initRegistry(s, 2, []string{"159.122.219.2"}, 2)
// Set up a cluster registry for network 3 with 3 instances for the service 'service5'
// Network has 2 gateways
initRegistry(s, 3, []string{"159.122.219.3", "179.114.119.3"}, 3)
// Set up a cluster registry for network 4 with 4 instances for the service 'service5'
// but without any gateway, which is treated as accessible directly.
initRegistry(s, 4, []string{}, 4)
// Push contexts needs to be updated
s.Discovery.ConfigUpdate(&model.PushRequest{Full: true})
time.Sleep(time.Millisecond * 200) // give time for cache to clear
fmt.Println("gateways", s.Env().NetworkManager.AllGateways())
tests := []struct {
network string
sidecarID string
want expectedResults
}{
{
// Verify that EDS from network1 will return 1 local endpoint with local VIP + 2 remote
// endpoints weighted accordingly with the IP of the ingress gateway.
network: "network1",
sidecarID: sidecarID("10.1.0.1", "app3"),
want: expectedResults{
weights: map[string]uint32{
// 1 local endpoint
"10.1.0.1": 2,
// 2 endopints on network 2, go through single gateway.
"159.122.219.2": 4,
// 3 endpoints on network 3, weights split across 2 gateways.
"159.122.219.3": 3,
"179.114.119.3": 3,
// no gateway defined for network 4 - treat as directly reachable.
"10.4.0.1": 2,
"10.4.0.2": 2,
"10.4.0.3": 2,
"10.4.0.4": 2,
},
},
},
{
// Verify that EDS from network2 will return 2 local endpoints with local VIPs + 2 remote
// endpoints weighted accordingly with the IP of the ingress gateway.
network: "network2",
sidecarID: sidecarID("10.2.0.1", "app3"),
want: expectedResults{
weights: map[string]uint32{
// 2 local endpoints
"10.2.0.1": 2,
"10.2.0.2": 2,
// 1 endpoint on network 1, accessed via gateway.
"159.122.219.1": 2,
// 3 endpoints on network 3, weights split across 2 gateways.
"159.122.219.3": 3,
"179.114.119.3": 3,
// no gateway defined for network 4 - treat as directly reachable.
"10.4.0.1": 2,
"10.4.0.2": 2,
"10.4.0.3": 2,
"10.4.0.4": 2,
},
},
},
{
// Verify that EDS from network3 will return 3 local endpoints with local VIPs + 2 remote
// endpoints weighted accordingly with the IP of the ingress gateway.
network: "network3",
sidecarID: sidecarID("10.3.0.1", "app3"),
want: expectedResults{
weights: map[string]uint32{
// 3 local endpoints.
"10.3.0.1": 2,
"10.3.0.2": 2,
"10.3.0.3": 2,
// 1 endpoint on network 1, accessed via gateway.
"159.122.219.1": 2,
// 2 endpoint on network 2, accessed via gateway.
"159.122.219.2": 4,
// no gateway defined for network 4 - treat as directly reachable.
"10.4.0.1": 2,
"10.4.0.2": 2,
"10.4.0.3": 2,
"10.4.0.4": 2,
},
},
},
{
// Verify that EDS from network4 will return 4 local endpoint with local VIP + 4 remote
// endpoints weighted accordingly with the IP of the ingress gateway.
network: "network4",
sidecarID: sidecarID("10.4.0.1", "app3"),
want: expectedResults{
weights: map[string]uint32{
// 4 local endpoints.
"10.4.0.1": 2,
"10.4.0.2": 2,
"10.4.0.3": 2,
"10.4.0.4": 2,
// 1 endpoint on network 1, accessed via gateway.
"159.122.219.1": 2,
// 2 endpoint on network 2, accessed via gateway.
"159.122.219.2": 4,
// 3 endpoints on network 3, weights split across 2 gateways.
"159.122.219.3": 3,
"179.114.119.3": 3,
},
},
},
}
for _, tt := range tests {
t.Run("from "+tt.network, func(t *testing.T) {
verifySplitHorizonResponse(t, s, tt.network, tt.sidecarID, tt.want)
})
}
}
// Tests whether an EDS response from the provided network matches the expected results
func verifySplitHorizonResponse(t *testing.T, s *xds.FakeDiscoveryServer, network string, sidecarID string, expected expectedResults) {
t.Helper()
ads := s.ConnectADS().WithID(sidecarID)
metadata := &structpb.Struct{Fields: map[string]*structpb.Value{
"ISTIO_VERSION": {Kind: &structpb.Value_StringValue{StringValue: "1.3"}},
"NETWORK": {Kind: &structpb.Value_StringValue{StringValue: network}},
}}
ads.RequestResponseAck(t, &discovery.DiscoveryRequest{
Node: &core.Node{
Id: ads.ID,
Metadata: metadata,
},
TypeUrl: v3.ClusterType,
})
clusterName := "outbound|1080||service5.default.svc.cluster.local"
res := ads.RequestResponseAck(t, &discovery.DiscoveryRequest{
Node: &core.Node{
Id: ads.ID,
Metadata: metadata,
},
TypeUrl: v3.EndpointType,
ResourceNames: []string{clusterName},
})
cla := xdstest.UnmarshalClusterLoadAssignment(t, res.Resources)[0]
eps := cla.Endpoints
if len(eps) != 1 {
t.Fatalf("expecting 1 locality endpoint but got %d", len(eps))
}
lbEndpoints := eps[0].LbEndpoints
if len(lbEndpoints) != len(expected.weights) {
t.Fatalf("unexpected number of endpoints.\nWant:\n%v\nGot:\n%v", expected.getAddrs(), getLbEndpointAddrs(lbEndpoints))
}
for addr, weight := range expected.weights {
var match *endpoint.LbEndpoint
for _, ep := range lbEndpoints {
if ep.GetEndpoint().Address.GetSocketAddress().Address == addr {
match = ep
break
}
}
if match == nil {
t.Fatalf("couldn't find endpoint with address %s: found %v", addr, getLbEndpointAddrs(lbEndpoints))
}
if match.LoadBalancingWeight.Value != weight {
t.Errorf("weight for endpoint %s is expected to be %d but got %d", addr, weight, match.LoadBalancingWeight.Value)
}
}
}
// initRegistry creates and initializes a memory registry that holds a single
// service with the provided amount of endpoints. It also creates a service for
// the ingress with the provided external IP
func initRegistry(server *xds.FakeDiscoveryServer, networkNum int, gatewaysIP []string, numOfEndpoints int) {
clusterID := cluster.ID(fmt.Sprintf("cluster%d", networkNum))
networkID := network.ID(fmt.Sprintf("network%d", networkNum))
memRegistry := memory.NewServiceDiscovery()
memRegistry.EDSUpdater = server.Discovery
server.Env().ServiceDiscovery.(*aggregate.Controller).AddRegistry(serviceregistry.Simple{
ClusterID: clusterID,
ProviderID: provider.Mock,
ServiceDiscovery: memRegistry,
Controller: &memory.ServiceController{},
})
gws := make([]*meshconfig.Network_IstioNetworkGateway, 0)
for _, gatewayIP := range gatewaysIP {
if gatewayIP != "" {
gw := &meshconfig.Network_IstioNetworkGateway{
Gw: &meshconfig.Network_IstioNetworkGateway_Address{
Address: gatewayIP,
},
Port: 80,
}
gws = append(gws, gw)
}
}
if len(gws) != 0 {
addNetwork(server, networkID, &meshconfig.Network{
Gateways: gws,
})
}
svcLabels := map[string]string{
"version": "v1.1",
}
// Explicit test service, in the v2 memory registry. Similar with mock.MakeService,
// but easier to read.
memRegistry.AddService(&model.Service{
Hostname: "service5.default.svc.cluster.local",
DefaultAddress: "10.10.0.1",
Ports: []*model.Port{
{
Name: "http-main",
Port: 1080,
Protocol: protocol.HTTP,
},
},
})
istioEndpoints := make([]*model.IstioEndpoint, numOfEndpoints)
for i := 0; i < numOfEndpoints; i++ {
addr := fmt.Sprintf("10.%d.0.%d", networkNum, i+1)
istioEndpoints[i] = &model.IstioEndpoint{
Address: addr,
EndpointPort: 2080,
ServicePortName: "http-main",
Network: networkID,
Locality: model.Locality{
Label: "az",
ClusterID: clusterID,
},
Labels: svcLabels,
TLSMode: model.IstioMutualTLSModeLabel,
}
}
memRegistry.SetEndpoints("service5.default.svc.cluster.local", "default", istioEndpoints)
}
func addNetwork(server *xds.FakeDiscoveryServer, id network.ID, network *meshconfig.Network) {
meshNetworks := server.Env().NetworksWatcher.Networks()
// copy old networks if they exist
c := map[string]*meshconfig.Network{}
if meshNetworks != nil {
for k, v := range meshNetworks.Networks {
c[k] = v
}
}
// add the new one
c[string(id)] = network
server.Env().NetworksWatcher.SetNetworks(&meshconfig.MeshNetworks{Networks: c})
}
func getLbEndpointAddrs(eps []*endpoint.LbEndpoint) []string {
addrs := make([]string, 0)
for _, lbEp := range eps {
addrs = append(addrs, lbEp.GetEndpoint().Address.GetSocketAddress().Address)
}
sort.Strings(addrs)
return addrs
}