blob: c26507d0c51285dcb1e2ebff4c3ea74169458933 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds_test
import (
"fmt"
"reflect"
"testing"
"time"
)
import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
networking "istio.io/api/networking/v1alpha3"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pilot/test/xdstest"
"github.com/apache/dubbo-go-pixiu/pkg/adsc"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/protocol"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
const (
routeA = "http.80"
routeB = "https.443.https.my-gateway.testns"
)
func TestStatusEvents(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.Connect(
&model.Proxy{
Metadata: &model.NodeMetadata{
Generator: "event",
},
},
[]string{xds.TypeURLConnect},
[]string{},
)
defer ads.Close()
dr, err := ads.WaitVersion(5*time.Second, xds.TypeURLConnect, "")
if err != nil {
t.Fatal(err)
}
if dr.Resources == nil || len(dr.Resources) == 0 {
t.Error("Expected connections, but not found")
}
// Create a second connection - we should get an event.
ads2 := s.Connect(nil, nil, nil)
defer ads2.Close()
dr, err = ads.WaitVersion(5*time.Second, xds.TypeURLConnect,
dr.VersionInfo)
if err != nil {
t.Fatal(err)
}
if dr.Resources == nil || len(dr.Resources) == 0 {
t.Error("Expected connections, but not found")
}
}
func TestAdsReconnectAfterRestart(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithType(v3.EndpointType)
res := ads.RequestResponseAck(t, &discovery.DiscoveryRequest{ResourceNames: []string{"fake-cluster"}})
// Close the connection and reconnect
ads.Cleanup()
ads = s.ConnectADS().WithType(v3.EndpointType)
// Reconnect with the same resources
ads.RequestResponseAck(t, &discovery.DiscoveryRequest{
ResourceNames: []string{"fake-cluster"},
ResponseNonce: res.Nonce,
VersionInfo: res.VersionInfo,
})
}
func TestAdsUnsubscribe(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithType(v3.EndpointType)
res := ads.RequestResponseAck(t, &discovery.DiscoveryRequest{ResourceNames: []string{"fake-cluster"}})
ads.Request(t, &discovery.DiscoveryRequest{
ResourceNames: nil,
ResponseNonce: res.Nonce,
VersionInfo: res.VersionInfo,
})
ads.ExpectNoResponse(t)
}
// Regression for envoy restart and overlapping connections
func TestAdsReconnect(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithType(v3.ClusterType)
ads.RequestResponseAck(t, nil)
// envoy restarts and reconnects
ads2 := s.ConnectADS().WithType(v3.ClusterType)
ads2.RequestResponseAck(t, nil)
// closes old process
ads.Cleanup()
// event happens, expect push to the remaining connection
xds.AdsPushAll(s.Discovery)
ads2.ExpectResponse(t)
}
// Regression for connection with a bad ID
func TestAdsBadId(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithID("").WithType(v3.ClusterType)
xds.AdsPushAll(s.Discovery)
ads.ExpectNoResponse(t)
}
func TestAdsClusterUpdate(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithType(v3.EndpointType)
version := ""
nonce := ""
sendEDSReqAndVerify := func(clusterName string) {
res := ads.RequestResponseAck(t, &discovery.DiscoveryRequest{
ResourceNames: []string{clusterName},
VersionInfo: version,
ResponseNonce: nonce,
})
version = res.VersionInfo
nonce = res.Nonce
got := xdstest.MapKeys(xdstest.ExtractLoadAssignments(xdstest.UnmarshalClusterLoadAssignment(t, res.Resources)))
if len(got) != 1 {
t.Fatalf("expected 1 response, got %v", len(got))
}
if got[0] != clusterName {
t.Fatalf("expected cluster %v got %v", clusterName, got[0])
}
}
cluster1 := "outbound|80||local.default.svc.cluster.local"
sendEDSReqAndVerify(cluster1)
cluster2 := "outbound|80||hello.default.svc.cluster.local"
sendEDSReqAndVerify(cluster2)
}
// nolint: lll
func TestAdsPushScoping(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
const (
svcSuffix = ".testPushScoping.com"
ns1 = "ns1"
)
removeServiceByNames := func(ns string, names ...string) {
configsUpdated := map[model.ConfigKey]struct{}{}
for _, name := range names {
hostname := host.Name(name)
s.Discovery.MemRegistry.RemoveService(hostname)
configsUpdated[model.ConfigKey{
Kind: gvk.ServiceEntry,
Name: string(hostname),
Namespace: ns,
}] = struct{}{}
}
s.Discovery.ConfigUpdate(&model.PushRequest{Full: true, ConfigsUpdated: configsUpdated})
}
removeService := func(ns string, indexes ...int) {
var names []string
for _, i := range indexes {
names = append(names, fmt.Sprintf("svc%d%s", i, svcSuffix))
}
removeServiceByNames(ns, names...)
}
addServiceByNames := func(ns string, names ...string) {
configsUpdated := map[model.ConfigKey]struct{}{}
for _, name := range names {
hostname := host.Name(name)
configsUpdated[model.ConfigKey{
Kind: gvk.ServiceEntry,
Name: string(hostname),
Namespace: ns,
}] = struct{}{}
s.Discovery.MemRegistry.AddService(&model.Service{
Hostname: hostname,
DefaultAddress: "10.11.0.1",
Ports: []*model.Port{
{
Name: "http-main",
Port: 2080,
Protocol: protocol.HTTP,
},
},
Attributes: model.ServiceAttributes{
Namespace: ns,
},
})
}
s.Discovery.ConfigUpdate(&model.PushRequest{Full: true, ConfigsUpdated: configsUpdated})
}
addService := func(ns string, indexes ...int) {
var hostnames []string
for _, i := range indexes {
hostnames = append(hostnames, fmt.Sprintf("svc%d%s", i, svcSuffix))
}
addServiceByNames(ns, hostnames...)
}
addServiceInstance := func(hostname host.Name, indexes ...int) {
for _, i := range indexes {
s.Discovery.MemRegistry.AddEndpoint(hostname, "http-main", 2080, "192.168.1.10", i)
}
s.Discovery.ConfigUpdate(&model.PushRequest{Full: false, ConfigsUpdated: map[model.ConfigKey]struct{}{
{Kind: gvk.ServiceEntry, Name: string(hostname), Namespace: model.IstioDefaultConfigNamespace}: {},
}})
}
addVirtualService := func(i int, hosts []string, dest string) {
if _, err := s.Store().Create(config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.VirtualService,
Name: fmt.Sprintf("vs%d", i), Namespace: model.IstioDefaultConfigNamespace,
},
Spec: &networking.VirtualService{
Hosts: hosts,
Http: []*networking.HTTPRoute{{
Name: "dest-foo",
Route: []*networking.HTTPRouteDestination{{
Destination: &networking.Destination{
Host: dest,
},
}},
}},
ExportTo: nil,
},
}); err != nil {
t.Fatal(err)
}
}
removeVirtualService := func(i int) {
s.Store().Delete(gvk.VirtualService, fmt.Sprintf("vs%d", i), model.IstioDefaultConfigNamespace, nil)
}
addDelegateVirtualService := func(i int, hosts []string, dest string) {
if _, err := s.Store().Create(config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.VirtualService,
Name: fmt.Sprintf("rootvs%d", i), Namespace: model.IstioDefaultConfigNamespace,
},
Spec: &networking.VirtualService{
Hosts: hosts,
Http: []*networking.HTTPRoute{{
Name: "dest-foo",
Delegate: &networking.Delegate{
Name: fmt.Sprintf("delegatevs%d", i),
Namespace: model.IstioDefaultConfigNamespace,
},
}},
ExportTo: nil,
},
}); err != nil {
t.Fatal(err)
}
if _, err := s.Store().Create(config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.VirtualService,
Name: fmt.Sprintf("delegatevs%d", i), Namespace: model.IstioDefaultConfigNamespace,
},
Spec: &networking.VirtualService{
Http: []*networking.HTTPRoute{{
Name: "dest-foo",
Route: []*networking.HTTPRouteDestination{{
Destination: &networking.Destination{
Host: dest,
},
}},
}},
ExportTo: nil,
},
}); err != nil {
t.Fatal(err)
}
}
updateDelegateVirtualService := func(i int, dest string) {
if _, err := s.Store().Update(config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.VirtualService,
Name: fmt.Sprintf("delegatevs%d", i), Namespace: model.IstioDefaultConfigNamespace,
},
Spec: &networking.VirtualService{
Http: []*networking.HTTPRoute{{
Name: "dest-foo",
Headers: &networking.Headers{
Request: &networking.Headers_HeaderOperations{
Remove: []string{"any-string"},
},
},
Route: []*networking.HTTPRouteDestination{
{
Destination: &networking.Destination{
Host: dest,
},
},
},
}},
ExportTo: nil,
},
}); err != nil {
t.Fatal(err)
}
}
removeDelegateVirtualService := func(i int) {
s.Store().Delete(gvk.VirtualService, fmt.Sprintf("rootvs%d", i), model.IstioDefaultConfigNamespace, nil)
s.Store().Delete(gvk.VirtualService, fmt.Sprintf("delegatevs%d", i), model.IstioDefaultConfigNamespace, nil)
}
addDestinationRule := func(i int, host string) {
if _, err := s.Store().Create(config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.DestinationRule,
Name: fmt.Sprintf("dr%d", i), Namespace: model.IstioDefaultConfigNamespace,
},
Spec: &networking.DestinationRule{
Host: host,
ExportTo: nil,
},
}); err != nil {
t.Fatal(err)
}
}
removeDestinationRule := func(i int) {
s.Store().Delete(gvk.DestinationRule, fmt.Sprintf("dr%d", i), model.IstioDefaultConfigNamespace, nil)
}
sc := &networking.Sidecar{
Egress: []*networking.IstioEgressListener{
{
Hosts: []string{model.IstioDefaultConfigNamespace + "/*" + svcSuffix},
},
},
}
scc := config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.Sidecar,
Name: "sc", Namespace: model.IstioDefaultConfigNamespace,
},
Spec: sc,
}
notMatchedScc := config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.Sidecar,
Name: "notMatchedSc", Namespace: model.IstioDefaultConfigNamespace,
},
Spec: &networking.Sidecar{
WorkloadSelector: &networking.WorkloadSelector{
Labels: map[string]string{"notMatched": "notMatched"},
},
},
}
if _, err := s.Store().Create(scc); err != nil {
t.Fatal(err)
}
addService(model.IstioDefaultConfigNamespace, 1, 2, 3)
adscConn := s.Connect(nil, nil, nil)
defer adscConn.Close()
type svcCase struct {
desc string
ev model.Event
svcIndexes []int
svcNames []string
ns string
instIndexes []struct {
name string
indexes []int
}
vsIndexes []struct {
index int
hosts []string
dest string
}
delegatevsIndexes []struct {
index int
hosts []string
dest string
}
drIndexes []struct {
index int
host string
}
cfgs []config.Config
expectedUpdates []string
unexpectedUpdates []string
}
svcCases := []svcCase{
{
desc: "Add a scoped service",
ev: model.EventAdd,
svcIndexes: []int{4},
ns: model.IstioDefaultConfigNamespace,
expectedUpdates: []string{v3.ListenerType},
}, // then: default 1,2,3,4
{
desc: "Add instances to a scoped service",
ev: model.EventAdd,
instIndexes: []struct {
name string
indexes []int
}{{fmt.Sprintf("svc%d%s", 4, svcSuffix), []int{1, 2}}},
ns: model.IstioDefaultConfigNamespace,
expectedUpdates: []string{v3.EndpointType},
}, // then: default 1,2,3,4
{
desc: "Add virtual service to a scoped service",
ev: model.EventAdd,
vsIndexes: []struct {
index int
hosts []string
dest string
}{{index: 4, hosts: []string{fmt.Sprintf("svc%d%s", 4, svcSuffix)}, dest: "unknown-svc"}},
expectedUpdates: []string{v3.ListenerType},
},
{
desc: "Delete virtual service of a scoped service",
ev: model.EventDelete,
vsIndexes: []struct {
index int
hosts []string
dest string
}{{index: 4}},
expectedUpdates: []string{v3.ListenerType},
},
{
desc: "Add destination rule to a scoped service",
ev: model.EventAdd,
drIndexes: []struct {
index int
host string
}{{4, fmt.Sprintf("svc%d%s", 4, svcSuffix)}},
expectedUpdates: []string{v3.ClusterType},
},
{
desc: "Delete destination rule of a scoped service",
ev: model.EventDelete,
drIndexes: []struct {
index int
host string
}{{index: 4}},
expectedUpdates: []string{v3.ClusterType},
},
{
desc: "Add a unscoped(name not match) service",
ev: model.EventAdd,
svcNames: []string{"foo.com"},
ns: model.IstioDefaultConfigNamespace,
unexpectedUpdates: []string{v3.ClusterType},
}, // then: default 1,2,3,4, foo.com; ns1: 11
{
desc: "Add instances to an unscoped service",
ev: model.EventAdd,
instIndexes: []struct {
name string
indexes []int
}{{"foo.com", []int{1, 2}}},
ns: model.IstioDefaultConfigNamespace,
unexpectedUpdates: []string{v3.EndpointType},
}, // then: default 1,2,3,4
{
desc: "Add a unscoped(ns not match) service",
ev: model.EventAdd,
svcIndexes: []int{11},
ns: ns1,
unexpectedUpdates: []string{v3.ClusterType},
}, // then: default 1,2,3,4, foo.com; ns1: 11
{
desc: "Add virtual service to an unscoped service",
ev: model.EventAdd,
vsIndexes: []struct {
index int
hosts []string
dest string
}{{index: 0, hosts: []string{"foo.com"}, dest: "unknown-service"}},
unexpectedUpdates: []string{v3.ClusterType},
},
{
desc: "Delete virtual service of a unscoped service",
ev: model.EventDelete,
vsIndexes: []struct {
index int
hosts []string
dest string
}{{index: 0}},
unexpectedUpdates: []string{v3.ClusterType},
},
{
desc: "Add destination rule to an unscoped service",
ev: model.EventAdd,
drIndexes: []struct {
index int
host string
}{{0, "foo.com"}},
unexpectedUpdates: []string{v3.ClusterType},
},
{
desc: "Delete destination rule of a unscoped service",
ev: model.EventDelete,
drIndexes: []struct {
index int
host string
}{{index: 0}},
unexpectedUpdates: []string{v3.ClusterType},
},
{
desc: "Add virtual service for scoped service with transitively scoped dest svc",
ev: model.EventAdd,
vsIndexes: []struct {
index int
hosts []string
dest string
}{{index: 4, hosts: []string{fmt.Sprintf("svc%d%s", 4, svcSuffix)}, dest: "foo.com"}},
expectedUpdates: []string{v3.ClusterType, v3.EndpointType},
},
{
desc: "Add instances for transitively scoped svc",
ev: model.EventAdd,
instIndexes: []struct {
name string
indexes []int
}{{"foo.com", []int{1, 2}}},
ns: model.IstioDefaultConfigNamespace,
expectedUpdates: []string{v3.EndpointType},
},
{
desc: "Delete virtual service for scoped service with transitively scoped dest svc",
ev: model.EventDelete,
vsIndexes: []struct {
index int
hosts []string
dest string
}{{index: 4}},
expectedUpdates: []string{v3.ClusterType},
},
{
desc: "Add delegation virtual service for scoped service with transitively scoped dest svc",
ev: model.EventAdd,
delegatevsIndexes: []struct {
index int
hosts []string
dest string
}{{index: 4, hosts: []string{fmt.Sprintf("svc%d%s", 4, svcSuffix)}, dest: "foo.com"}},
expectedUpdates: []string{v3.ListenerType, v3.RouteType, v3.ClusterType, v3.EndpointType},
},
{
desc: "Update delegate virtual service should trigger full push",
ev: model.EventUpdate,
delegatevsIndexes: []struct {
index int
hosts []string
dest string
}{{index: 4, hosts: []string{fmt.Sprintf("svc%d%s", 4, svcSuffix)}, dest: "foo.com"}},
expectedUpdates: []string{v3.ListenerType, v3.RouteType, v3.ClusterType},
},
{
desc: "Delete delegate virtual service for scoped service with transitively scoped dest svc",
ev: model.EventDelete,
delegatevsIndexes: []struct {
index int
hosts []string
dest string
}{{index: 4}},
expectedUpdates: []string{v3.ListenerType, v3.RouteType, v3.ClusterType},
},
{
desc: "Remove a scoped service",
ev: model.EventDelete,
svcIndexes: []int{4},
ns: model.IstioDefaultConfigNamespace,
expectedUpdates: []string{v3.ListenerType},
}, // then: default 1,2,3, foo.com; ns: 11
{
desc: "Remove a unscoped(name not match) service",
ev: model.EventDelete,
svcNames: []string{"foo.com"},
ns: model.IstioDefaultConfigNamespace,
unexpectedUpdates: []string{v3.ClusterType},
}, // then: default 1,2,3; ns1: 11
{
desc: "Remove a unscoped(ns not match) service",
ev: model.EventDelete,
svcIndexes: []int{11},
ns: ns1,
unexpectedUpdates: []string{v3.ClusterType},
}, // then: default 1,2,3
{
desc: "Add an unmatched Sidecar config",
ev: model.EventAdd,
cfgs: []config.Config{notMatchedScc},
ns: model.IstioDefaultConfigNamespace,
unexpectedUpdates: []string{v3.ListenerType, v3.RouteType, v3.ClusterType, v3.EndpointType},
},
{
desc: "Update the Sidecar config",
ev: model.EventUpdate,
cfgs: []config.Config{scc},
ns: model.IstioDefaultConfigNamespace,
expectedUpdates: []string{v3.ListenerType, v3.RouteType, v3.ClusterType, v3.EndpointType},
},
}
for _, c := range svcCases {
t.Run(c.desc, func(t *testing.T) {
// Let events from previous tests complete
time.Sleep(time.Millisecond * 50)
adscConn.WaitClear()
var wantUpdates []string
wantUpdates = append(wantUpdates, c.expectedUpdates...)
wantUpdates = append(wantUpdates, c.unexpectedUpdates...)
switch c.ev {
case model.EventAdd:
if len(c.svcIndexes) > 0 {
addService(c.ns, c.svcIndexes...)
}
if len(c.svcNames) > 0 {
addServiceByNames(c.ns, c.svcNames...)
}
if len(c.instIndexes) > 0 {
for _, instIndex := range c.instIndexes {
addServiceInstance(host.Name(instIndex.name), instIndex.indexes...)
}
}
if len(c.vsIndexes) > 0 {
for _, vsIndex := range c.vsIndexes {
addVirtualService(vsIndex.index, vsIndex.hosts, vsIndex.dest)
}
}
if len(c.delegatevsIndexes) > 0 {
for _, vsIndex := range c.delegatevsIndexes {
addDelegateVirtualService(vsIndex.index, vsIndex.hosts, vsIndex.dest)
}
}
if len(c.drIndexes) > 0 {
for _, drIndex := range c.drIndexes {
addDestinationRule(drIndex.index, drIndex.host)
}
}
if len(c.cfgs) > 0 {
for _, cfg := range c.cfgs {
if _, err := s.Store().Create(cfg); err != nil {
t.Fatal(err)
}
}
}
case model.EventUpdate:
if len(c.delegatevsIndexes) > 0 {
for _, vsIndex := range c.delegatevsIndexes {
updateDelegateVirtualService(vsIndex.index, vsIndex.dest)
}
}
if len(c.cfgs) > 0 {
for _, cfg := range c.cfgs {
if _, err := s.Store().Update(cfg); err != nil {
t.Fatal(err)
}
}
}
case model.EventDelete:
if len(c.svcIndexes) > 0 {
removeService(c.ns, c.svcIndexes...)
}
if len(c.svcNames) > 0 {
removeServiceByNames(c.ns, c.svcNames...)
}
if len(c.vsIndexes) > 0 {
for _, vsIndex := range c.vsIndexes {
removeVirtualService(vsIndex.index)
}
}
if len(c.delegatevsIndexes) > 0 {
for _, vsIndex := range c.delegatevsIndexes {
removeDelegateVirtualService(vsIndex.index)
}
}
if len(c.drIndexes) > 0 {
for _, drIndex := range c.drIndexes {
removeDestinationRule(drIndex.index)
}
}
default:
t.Fatalf("wrong event for case %v", c)
}
timeout := time.Millisecond * 200
upd, _ := adscConn.Wait(timeout, wantUpdates...)
for _, expect := range c.expectedUpdates {
if !contains(upd, expect) {
t.Fatalf("expected update %s not in updates %v", expect, upd)
}
}
for _, unexpect := range c.unexpectedUpdates {
if contains(upd, unexpect) {
t.Fatalf("expected to not get update %s, but it is in updates %v", unexpect, upd)
}
}
})
}
}
func TestAdsUpdate(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS()
s.Discovery.MemRegistry.AddService(&model.Service{
Hostname: "adsupdate.default.svc.cluster.local",
DefaultAddress: "10.11.0.1",
Ports: []*model.Port{
{
Name: "http-main",
Port: 2080,
Protocol: protocol.HTTP,
},
},
Attributes: model.ServiceAttributes{
Name: "adsupdate",
Namespace: "default",
},
})
s.Discovery.ConfigUpdate(&model.PushRequest{Full: true})
time.Sleep(time.Millisecond * 200)
s.Discovery.MemRegistry.SetEndpoints("adsupdate.default.svc.cluster.local", "default",
newEndpointWithAccount("10.2.0.1", "hello-sa", "v1"))
cluster := "outbound|2080||adsupdate.default.svc.cluster.local"
res := ads.RequestResponseAck(t, &discovery.DiscoveryRequest{
ResourceNames: []string{cluster},
TypeUrl: v3.EndpointType,
})
eps, f := xdstest.ExtractLoadAssignments(xdstest.UnmarshalClusterLoadAssignment(t, res.GetResources()))[cluster]
if !f {
t.Fatalf("did not find cluster %v", cluster)
}
if !reflect.DeepEqual(eps, []string{"10.2.0.1:80"}) {
t.Fatalf("expected endpoints [10.2.0.1:80] got %v", eps)
}
_ = s.Discovery.MemRegistry.AddEndpoint("adsupdate.default.svc.cluster.local",
"http-main", 2080, "10.1.7.1", 1080)
// will trigger recompute and push for all clients - including some that may be closing
// This reproduced the 'push on closed connection' bug.
xds.AdsPushAll(s.Discovery)
res1 := ads.ExpectResponse(t)
xdstest.UnmarshalClusterLoadAssignment(t, res1.GetResources())
}
func TestEnvoyRDSProtocolError(t *testing.T) {
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithType(v3.RouteType)
ads.RequestResponseAck(t, &discovery.DiscoveryRequest{ResourceNames: []string{routeA}})
xds.AdsPushAll(s.Discovery)
res := ads.ExpectResponse(t)
// send empty response and validate no response is returned.
ads.Request(t, &discovery.DiscoveryRequest{
ResourceNames: nil,
VersionInfo: res.VersionInfo,
ResponseNonce: res.Nonce,
})
ads.ExpectNoResponse(t)
// Refresh routes
ads.Request(t, &discovery.DiscoveryRequest{
ResourceNames: []string{routeA, routeB},
VersionInfo: res.VersionInfo,
ResponseNonce: res.Nonce,
})
}
func TestEnvoyRDSUpdatedRouteRequest(t *testing.T) {
expectRoutes := func(resp *discovery.DiscoveryResponse, expected ...string) {
t.Helper()
got := xdstest.MapKeys(xdstest.ExtractRouteConfigurations(xdstest.UnmarshalRouteConfiguration(t, resp.Resources)))
if !reflect.DeepEqual(expected, got) {
t.Fatalf("expected routes %v got %v", expected, got)
}
}
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithType(v3.RouteType)
resp := ads.RequestResponseAck(t, &discovery.DiscoveryRequest{ResourceNames: []string{routeA}})
expectRoutes(resp, routeA)
xds.AdsPushAll(s.Discovery)
resp = ads.ExpectResponse(t)
expectRoutes(resp, routeA)
// Test update from A -> B
resp = ads.RequestResponseAck(t, &discovery.DiscoveryRequest{ResourceNames: []string{routeB}})
expectRoutes(resp, routeB)
// Test update from B -> A, B
resp = ads.RequestResponseAck(t, &discovery.DiscoveryRequest{ResourceNames: []string{routeA, routeB}})
expectRoutes(resp, routeA, routeB)
// Test update from B, B -> A
resp = ads.RequestResponseAck(t, &discovery.DiscoveryRequest{ResourceNames: []string{routeA}})
expectRoutes(resp, routeA)
}
func TestEdsCache(t *testing.T) {
makeEndpoint := func(addr []*networking.WorkloadEntry) config.Config {
return config.Config{
Meta: config.Meta{
Name: "service",
Namespace: "default",
GroupVersionKind: gvk.ServiceEntry,
},
Spec: &networking.ServiceEntry{
Hosts: []string{"foo.com"},
Ports: []*networking.Port{{
Number: 80,
Protocol: "HTTP",
Name: "http",
}},
Resolution: networking.ServiceEntry_STATIC,
Endpoints: addr,
},
}
}
assertEndpoints := func(a *adsc.ADSC, addr ...string) {
t.Helper()
retry.UntilSuccessOrFail(t, func() error {
got := sets.New(xdstest.ExtractEndpoints(a.GetEndpoints()["outbound|80||foo.com"])...)
want := sets.New(addr...)
if !got.Equals(want) {
return fmt.Errorf("invalid endpoints, got %v want %v", got, addr)
}
return nil
}, retry.Timeout(time.Second*5))
}
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{
Configs: []config.Config{
makeEndpoint([]*networking.WorkloadEntry{
{Address: "1.2.3.4", Locality: "region/zone"},
{Address: "1.2.3.5", Locality: "notmatch"},
}),
},
})
ads := s.Connect(&model.Proxy{Locality: &core.Locality{Region: "region"}}, nil, watchAll)
assertEndpoints(ads, "1.2.3.4:80", "1.2.3.5:80")
t.Logf("endpoints: %+v", xdstest.ExtractEndpoints(ads.GetEndpoints()["outbound|80||foo.com"]))
if _, err := s.Store().Update(makeEndpoint([]*networking.WorkloadEntry{
{Address: "1.2.3.6", Locality: "region/zone"},
{Address: "1.2.3.5", Locality: "notmatch"},
})); err != nil {
t.Fatal(err)
}
if _, err := ads.Wait(time.Second*5, v3.EndpointType); err != nil {
t.Fatal(err)
}
assertEndpoints(ads, "1.2.3.6:80", "1.2.3.5:80")
t.Logf("endpoints: %+v", xdstest.ExtractEndpoints(ads.GetEndpoints()["outbound|80||foo.com"]))
ads.WaitClear()
if _, err := s.Store().Create(config.Config{
Meta: config.Meta{
Name: "service",
Namespace: "default",
GroupVersionKind: gvk.DestinationRule,
},
Spec: &networking.DestinationRule{
Host: "foo.com",
TrafficPolicy: &networking.TrafficPolicy{
OutlierDetection: &networking.OutlierDetection{},
},
},
}); err != nil {
t.Fatal(err)
}
if _, err := ads.Wait(time.Second*5, v3.EndpointType); err != nil {
t.Fatal(err)
}
assertEndpoints(ads, "1.2.3.6:80", "1.2.3.5:80")
retry.UntilSuccessOrFail(t, func() error {
found := false
for _, ep := range ads.GetEndpoints()["outbound|80||foo.com"].Endpoints {
if ep.Priority == 1 {
found = true
}
}
if !found {
return fmt.Errorf("locality did not update")
}
return nil
}, retry.Timeout(time.Second*5))
ads.WaitClear()
ep := makeEndpoint([]*networking.WorkloadEntry{{Address: "1.2.3.6", Locality: "region/zone"}, {Address: "1.2.3.5", Locality: "notmatch"}})
ep.Spec.(*networking.ServiceEntry).Resolution = networking.ServiceEntry_DNS
if _, err := s.Store().Update(ep); err != nil {
t.Fatal(err)
}
if _, err := ads.Wait(time.Second*5, v3.EndpointType); err != nil {
t.Fatal(err)
}
assertEndpoints(ads)
t.Logf("endpoints: %+v", ads.GetEndpoints())
}