| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package xds_test |
| |
| import ( |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io" |
| "net/http" |
| "net/http/httptest" |
| "os" |
| "path/filepath" |
| "reflect" |
| "runtime" |
| "sort" |
| "strconv" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| ) |
| |
| import ( |
| endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" |
| uatomic "go.uber.org/atomic" |
| "istio.io/pkg/log" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds" |
| v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3" |
| "github.com/apache/dubbo-go-pixiu/pilot/test/xdstest" |
| "github.com/apache/dubbo-go-pixiu/pkg/adsc" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/host" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/protocol" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk" |
| "github.com/apache/dubbo-go-pixiu/pkg/test" |
| "github.com/apache/dubbo-go-pixiu/pkg/test/env" |
| "github.com/apache/dubbo-go-pixiu/pkg/util/sets" |
| ) |
| |
| // The connect and reconnect tests are removed - ADS already has coverage, and the |
| // StreamEndpoints is not used in 1.0+ |
| const ( |
| asdcLocality = "region1/zone1/subzone1" |
| asdc2Locality = "region2/zone2/subzone2" |
| |
| edsIncSvc = "eds.test.svc.cluster.local" |
| edsIncVip = "10.10.1.2" |
| ) |
| |
| func TestIncrementalPush(t *testing.T) { |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{ |
| ConfigString: mustReadFile(t, "tests/testdata/config/destination-rule-all.yaml") + |
| mustReadFile(t, "tests/testdata/config/static-weighted-se.yaml"), |
| }) |
| ads := s.Connect(nil, nil, watchAll) |
| t.Run("Full Push", func(t *testing.T) { |
| s.Discovery.Push(&model.PushRequest{Full: true}) |
| if _, err := ads.Wait(time.Second*5, watchAll...); err != nil { |
| t.Fatal(err) |
| } |
| }) |
| t.Run("Incremental Push with updated services", func(t *testing.T) { |
| ads.WaitClear() |
| s.Discovery.Push(&model.PushRequest{ |
| Full: false, |
| ConfigsUpdated: map[model.ConfigKey]struct{}{ |
| {Name: "destall.default.svc.cluster.local", Namespace: "testns", Kind: gvk.ServiceEntry}: {}, |
| }, |
| }) |
| if err := ads.WaitSingle(time.Second*5, v3.EndpointType, v3.ClusterType); err != nil { |
| t.Fatal(err) |
| } |
| }) |
| t.Run("Full Push with updated services", func(t *testing.T) { |
| ads.WaitClear() |
| |
| s.Discovery.Push(&model.PushRequest{ |
| Full: true, |
| ConfigsUpdated: map[model.ConfigKey]struct{}{ |
| {Name: "weighted.static.svc.cluster.local", Namespace: "default", Kind: gvk.ServiceEntry}: {}, |
| }, |
| }) |
| if _, err := ads.Wait(time.Second*5, watchAll...); err != nil { |
| t.Fatal(err) |
| } |
| if len(ads.GetEndpoints()) != 4 { |
| t.Fatalf("Expected a partial EDS update, but got: %v", xdstest.MapKeys(ads.GetEndpoints())) |
| } |
| }) |
| t.Run("Full Push with multiple updates", func(t *testing.T) { |
| ads.WaitClear() |
| s.Discovery.Push(&model.PushRequest{ |
| Full: true, |
| ConfigsUpdated: map[model.ConfigKey]struct{}{ |
| {Name: "foo.bar", Namespace: "default", Kind: gvk.ServiceEntry}: {}, |
| {Name: "destall", Namespace: "testns", Kind: gvk.DestinationRule}: {}, |
| }, |
| }) |
| if _, err := ads.Wait(time.Second*5, watchAll...); err != nil { |
| t.Fatal(err) |
| } |
| if len(ads.GetEndpoints()) != 4 { |
| t.Fatalf("Expected a full EDS update, but got: %v", xdstest.MapKeys(ads.GetEndpoints())) |
| } |
| }) |
| t.Run("Full Push without updated services", func(t *testing.T) { |
| ads.WaitClear() |
| s.Discovery.Push(&model.PushRequest{ |
| Full: true, |
| ConfigsUpdated: map[model.ConfigKey]struct{}{ |
| {Name: "destall", Namespace: "testns", Kind: gvk.DestinationRule}: {}, |
| }, |
| }) |
| if _, err := ads.Wait(time.Second*5, v3.ClusterType, v3.EndpointType); err != nil { |
| t.Fatal(err) |
| } |
| if len(ads.GetEndpoints()) < 3 { |
| t.Fatalf("Expected a full EDS update, but got: %v", ads.GetEndpoints()) |
| } |
| }) |
| } |
| |
| func TestEds(t *testing.T) { |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{ |
| ConfigString: mustReadFile(t, "tests/testdata/config/destination-rule-locality.yaml"), |
| DiscoveryServerModifier: func(s *xds.DiscoveryServer) { |
| addUdsEndpoint(s) |
| |
| // enable locality load balancing and add relevant endpoints in order to test |
| addLocalityEndpoints(s, "locality.cluster.local") |
| addLocalityEndpoints(s, "locality-no-outlier-detection.cluster.local") |
| |
| // Add the test ads clients to list of service instances in order to test the context dependent locality coloring. |
| addTestClientEndpoints(s) |
| |
| s.MemRegistry.AddHTTPService(edsIncSvc, edsIncVip, 8080) |
| s.MemRegistry.SetEndpoints(edsIncSvc, "", |
| newEndpointWithAccount("127.0.0.1", "hello-sa", "v1")) |
| }, |
| }) |
| |
| adscConn := s.Connect(&model.Proxy{IPAddresses: []string{"10.10.10.10"}}, nil, watchAll) |
| adscConn2 := s.Connect(&model.Proxy{IPAddresses: []string{"10.10.10.11"}}, nil, watchAll) |
| |
| t.Run("TCPEndpoints", func(t *testing.T) { |
| testTCPEndpoints("127.0.0.1", adscConn, t) |
| }) |
| t.Run("edsz", func(t *testing.T) { |
| testEdsz(t, s, "test-1.default") |
| }) |
| t.Run("LocalityPrioritizedEndpoints", func(t *testing.T) { |
| testLocalityPrioritizedEndpoints(adscConn, adscConn2, t) |
| }) |
| t.Run("UDSEndpoints", func(t *testing.T) { |
| testUdsEndpoints(adscConn, t) |
| }) |
| t.Run("PushIncremental", func(t *testing.T) { |
| edsUpdateInc(s, adscConn, t) |
| }) |
| t.Run("Push", func(t *testing.T) { |
| edsUpdates(s, adscConn, t) |
| }) |
| t.Run("MultipleRequest", func(t *testing.T) { |
| multipleRequest(s, false, 20, 5, 25*time.Second, nil, t) |
| }) |
| // 5 pushes for 100 clients, using EDS incremental only. |
| t.Run("MultipleRequestIncremental", func(t *testing.T) { |
| multipleRequest(s, true, 20, 5, 25*time.Second, nil, t) |
| }) |
| t.Run("CDSSave", func(t *testing.T) { |
| // Moved from cds_test, using new client |
| clusters := adscConn.GetClusters() |
| if len(clusters) == 0 { |
| t.Error("No clusters in ADS response") |
| } |
| strResponse, _ := json.MarshalIndent(clusters, " ", " ") |
| _ = os.WriteFile(env.IstioOut+"/cdsv2_sidecar.json", strResponse, 0o644) |
| }) |
| } |
| |
| // newEndpointWithAccount is a helper for IstioEndpoint creation. Creates endpoints with |
| // port name "http", with the given IP, service account and a 'version' label. |
| // nolint: unparam |
| func newEndpointWithAccount(ip, account, version string) []*model.IstioEndpoint { |
| return []*model.IstioEndpoint{ |
| { |
| Address: ip, |
| ServicePortName: "http-main", |
| EndpointPort: 80, |
| Labels: map[string]string{"version": version}, |
| ServiceAccount: account, |
| }, |
| } |
| } |
| |
| func TestTunnelServerEndpointEds(t *testing.T) { |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{}) |
| s.Discovery.MemRegistry.AddHTTPService(edsIncSvc, edsIncVip, 8080) |
| s.Discovery.MemRegistry.SetEndpoints(edsIncSvc, "", |
| []*model.IstioEndpoint{ |
| { |
| Address: "127.0.0.1", |
| ServicePortName: "http-main", |
| EndpointPort: 80, |
| // Labels: map[string]string{"version": version}, |
| ServiceAccount: "hello-sa", |
| TunnelAbility: networking.MakeTunnelAbility(networking.H2Tunnel), |
| }, |
| }) |
| |
| t.Run("TestClientWantsTunnelEndpoints", func(t *testing.T) { |
| t.Helper() |
| adscConn1 := s.Connect(&model.Proxy{IPAddresses: []string{"10.10.10.10"}, Metadata: &model.NodeMetadata{ |
| ProxyConfig: &model.NodeMetaProxyConfig{ |
| ProxyMetadata: map[string]string{ |
| "tunnel": networking.H2TunnelTypeName, |
| }, |
| }, |
| }}, nil, watchAll) |
| testTunnelEndpoints("127.0.0.1", 15009, adscConn1, t) |
| }) |
| t.Run("TestClientWantsNoTunnelEndpoints", func(t *testing.T) { |
| t.Helper() |
| adscConn2 := s.Connect(&model.Proxy{IPAddresses: []string{"10.10.10.11"}, Metadata: &model.NodeMetadata{ |
| ProxyConfig: &model.NodeMetaProxyConfig{}, |
| }}, nil, watchAll) |
| testTunnelEndpoints("127.0.0.1", 80, adscConn2, t) |
| }) |
| } |
| |
| func TestNoTunnelServerEndpointEds(t *testing.T) { |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{}) |
| |
| // Add the test ads clients to list of service instances in order to test the context dependent locality coloring. |
| addTestClientEndpoints(s.Discovery) |
| |
| s.Discovery.MemRegistry.AddHTTPService(edsIncSvc, edsIncVip, 8080) |
| s.Discovery.MemRegistry.SetEndpoints(edsIncSvc, "", |
| []*model.IstioEndpoint{ |
| { |
| Address: "127.0.0.1", |
| ServicePortName: "http-main", |
| EndpointPort: 80, |
| // Labels: map[string]string{"version": version}, |
| ServiceAccount: "hello-sa", |
| // No Tunnel Support at this endpoint. |
| TunnelAbility: networking.MakeTunnelAbility(), |
| }, |
| }) |
| |
| t.Run("TestClientWantsTunnelEndpoints", func(t *testing.T) { |
| adscConn := s.Connect(&model.Proxy{IPAddresses: []string{"10.10.10.10"}, Metadata: &model.NodeMetadata{ |
| ProxyConfig: &model.NodeMetaProxyConfig{ |
| ProxyMetadata: map[string]string{ |
| "tunnel": networking.H2TunnelTypeName, |
| }, |
| }, |
| }}, nil, watchAll) |
| testTunnelEndpoints("127.0.0.1", 80, adscConn, t) |
| }) |
| |
| t.Run("TestClientWantsNoTunnelEndpoints", func(t *testing.T) { |
| adscConn := s.Connect(&model.Proxy{IPAddresses: []string{"10.10.10.11"}, Metadata: &model.NodeMetadata{}}, nil, watchAll) |
| testTunnelEndpoints("127.0.0.1", 80, adscConn, t) |
| }) |
| } |
| |
| func mustReadFile(t *testing.T, fpaths ...string) string { |
| result := "" |
| for _, fpath := range fpaths { |
| if !strings.HasPrefix(fpath, ".") { |
| fpath = filepath.Join(env.IstioSrc, fpath) |
| } |
| bytes, err := os.ReadFile(fpath) |
| if err != nil { |
| t.Fatal(err) |
| } |
| result += "---\n" |
| result += string(bytes) |
| } |
| return result |
| } |
| |
| func mustReadfolder(t *testing.T, folder string) string { |
| result := "" |
| fpathRoot := folder |
| if !strings.HasPrefix(fpathRoot, ".") { |
| fpathRoot = filepath.Join(env.IstioSrc, folder) |
| } |
| f, err := os.ReadDir(fpathRoot) |
| if err != nil { |
| t.Fatal(err) |
| } |
| for _, fpath := range f { |
| bytes, err := os.ReadFile(filepath.Join(fpathRoot, fpath.Name())) |
| if err != nil { |
| t.Fatal(err) |
| } |
| result += "---\n" |
| result += string(bytes) |
| } |
| return result |
| } |
| |
| func TestEdsWeightedServiceEntry(t *testing.T) { |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{ConfigString: mustReadFile(t, "tests/testdata/config/static-weighted-se.yaml")}) |
| adscConn := s.Connect(nil, nil, watchEds) |
| endpoints := adscConn.GetEndpoints() |
| lbe, f := endpoints["outbound|80||weighted.static.svc.cluster.local"] |
| if !f || len(lbe.Endpoints) == 0 { |
| t.Fatalf("No lb endpoints for %v, %v", "outbound|80||weighted.static.svc.cluster.local", adscConn.EndpointsJSON()) |
| } |
| expected := map[string]uint32{ |
| "a": 9, // sum of 1 and 8 |
| "b": 3, |
| "3.3.3.3": 1, // no weight provided is normalized to 1 |
| "2.2.2.2": 8, |
| "1.1.1.1": 3, |
| } |
| got := make(map[string]uint32) |
| for _, lbe := range lbe.Endpoints { |
| got[lbe.Locality.Region] = lbe.LoadBalancingWeight.Value |
| for _, e := range lbe.LbEndpoints { |
| got[e.GetEndpoint().Address.GetSocketAddress().Address] = e.LoadBalancingWeight.Value |
| } |
| } |
| if !reflect.DeepEqual(expected, got) { |
| t.Errorf("Expected LB weights %v got %v", expected, got) |
| } |
| } |
| |
| var ( |
| watchEds = []string{v3.ClusterType, v3.EndpointType} |
| watchAll = []string{v3.ClusterType, v3.EndpointType, v3.ListenerType, v3.RouteType} |
| ) |
| |
| func TestEDSOverlapping(t *testing.T) { |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{}) |
| addOverlappingEndpoints(s) |
| adscon := s.Connect(nil, nil, watchEds) |
| testOverlappingPorts(s, adscon, t) |
| } |
| |
| func TestEDSUnhealthyEndpoints(t *testing.T) { |
| test.SetBoolForTest(t, &features.SendUnhealthyEndpoints, true) |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{}) |
| addUnhealthyCluster(s) |
| adscon := s.Connect(nil, nil, watchEds) |
| _, err := adscon.Wait(5 * time.Second) |
| if err != nil { |
| t.Fatalf("Error in push %v", err) |
| } |
| |
| validateEndpoints := func(expectPush bool, healthy []string, unhealthy []string) { |
| t.Helper() |
| // Normalize lists to make comparison easier |
| if healthy == nil { |
| healthy = []string{} |
| } |
| if unhealthy == nil { |
| unhealthy = []string{} |
| } |
| sort.Strings(healthy) |
| sort.Strings(unhealthy) |
| if expectPush { |
| upd, _ := adscon.Wait(5*time.Second, v3.EndpointType) |
| |
| if len(upd) > 0 && !contains(upd, v3.EndpointType) { |
| t.Fatalf("Expecting EDS push as endpoint health is changed. But received %v", upd) |
| } |
| } else { |
| upd, _ := adscon.Wait(50*time.Millisecond, v3.EndpointType) |
| if contains(upd, v3.EndpointType) { |
| t.Fatalf("Expected no EDS push, got %v", upd) |
| } |
| } |
| |
| // Validate that endpoints are pushed. |
| lbe := adscon.GetEndpoints()["outbound|53||unhealthy.svc.cluster.local"] |
| eh, euh := xdstest.ExtractHealthEndpoints(lbe) |
| gotHealthy := sets.New(eh...).SortedList() |
| gotUnhealthy := sets.New(euh...).SortedList() |
| if !reflect.DeepEqual(gotHealthy, healthy) { |
| t.Fatalf("did not get expected endpoints: got %v, want %v", gotHealthy, healthy) |
| } |
| if !reflect.DeepEqual(gotUnhealthy, unhealthy) { |
| t.Fatalf("did not get expected unhealthy endpoints: got %v, want %v", gotUnhealthy, unhealthy) |
| } |
| } |
| |
| // Validate that we do not send initial unhealthy endpoints. |
| validateEndpoints(false, nil, nil) |
| adscon.WaitClear() |
| |
| // Set additional unhealthy endpoint and validate Eds update is not triggered. |
| s.Discovery.MemRegistry.SetEndpoints("unhealthy.svc.cluster.local", "", |
| []*model.IstioEndpoint{ |
| { |
| Address: "10.0.0.53", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.UnHealthy, |
| }, |
| { |
| Address: "10.0.0.54", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.UnHealthy, |
| }, |
| }) |
| |
| // Validate that endpoint is not pushed. |
| validateEndpoints(false, nil, nil) |
| |
| // Change the status of endpoint to Healthy and validate Eds is pushed. |
| s.Discovery.MemRegistry.SetEndpoints("unhealthy.svc.cluster.local", "", |
| []*model.IstioEndpoint{ |
| { |
| Address: "10.0.0.53", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.Healthy, |
| }, |
| { |
| Address: "10.0.0.54", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.Healthy, |
| }, |
| }) |
| |
| // Validate that endpoints are pushed. |
| validateEndpoints(true, []string{"10.0.0.53:53", "10.0.0.54:53"}, nil) |
| |
| // Set to exact same endpoints |
| s.Discovery.MemRegistry.SetEndpoints("unhealthy.svc.cluster.local", "", |
| []*model.IstioEndpoint{ |
| { |
| Address: "10.0.0.53", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.Healthy, |
| }, |
| { |
| Address: "10.0.0.54", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.Healthy, |
| }, |
| }) |
| // Validate that endpoint is not pushed. |
| validateEndpoints(false, []string{"10.0.0.53:53", "10.0.0.54:53"}, nil) |
| |
| // Now change the status of endpoint to UnHealthy and validate Eds is pushed. |
| s.Discovery.MemRegistry.SetEndpoints("unhealthy.svc.cluster.local", "", |
| []*model.IstioEndpoint{ |
| { |
| Address: "10.0.0.53", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.UnHealthy, |
| }, |
| { |
| Address: "10.0.0.54", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.Healthy, |
| }, |
| }) |
| |
| // Validate that endpoints are pushed. |
| validateEndpoints(true, []string{"10.0.0.54:53"}, []string{"10.0.0.53:53"}) |
| |
| // Change the status of endpoint to Healthy and validate Eds is pushed. |
| s.Discovery.MemRegistry.SetEndpoints("unhealthy.svc.cluster.local", "", |
| []*model.IstioEndpoint{ |
| { |
| Address: "10.0.0.53", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.Healthy, |
| }, |
| { |
| Address: "10.0.0.54", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.Healthy, |
| }, |
| }) |
| |
| validateEndpoints(true, []string{"10.0.0.54:53", "10.0.0.53:53"}, nil) |
| |
| // Remove a healthy endpoint |
| s.Discovery.MemRegistry.SetEndpoints("unhealthy.svc.cluster.local", "", |
| []*model.IstioEndpoint{ |
| { |
| Address: "10.0.0.53", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.Healthy, |
| }, |
| }) |
| |
| validateEndpoints(true, []string{"10.0.0.53:53"}, nil) |
| |
| // Remove last healthy endpoint |
| s.Discovery.MemRegistry.SetEndpoints("unhealthy.svc.cluster.local", "", []*model.IstioEndpoint{}) |
| validateEndpoints(true, nil, nil) |
| } |
| |
| // Validates the behavior when Service resolution type is updated after initial EDS push. |
| // See https://github.com/istio/istio/issues/18355 for more details. |
| func TestEDSServiceResolutionUpdate(t *testing.T) { |
| for _, resolution := range []model.Resolution{model.DNSLB, model.DNSRoundRobinLB} { |
| t.Run(fmt.Sprintf("resolution_%s", resolution), func(t *testing.T) { |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{}) |
| addEdsCluster(s, "edsdns.svc.cluster.local", "http", "10.0.0.53", 8080) |
| addEdsCluster(s, "other.local", "http", "1.1.1.1", 8080) |
| |
| adscConn := s.Connect(nil, nil, watchAll) |
| |
| // Validate that endpoints are pushed correctly. |
| testEndpoints("10.0.0.53", "outbound|8080||edsdns.svc.cluster.local", adscConn, t) |
| |
| // Now update the service resolution to DNSLB/DNSRRLB with a DNS endpoint. |
| updateServiceResolution(s, resolution) |
| |
| if _, err := adscConn.Wait(5*time.Second, v3.EndpointType); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Validate that endpoints are skipped. |
| lbe := adscConn.GetEndpoints()["outbound|8080||edsdns.svc.cluster.local"] |
| if lbe != nil && len(lbe.Endpoints) > 0 { |
| t.Fatalf("endpoints not expected for %s, but got %v", "edsdns.svc.cluster.local", adscConn.EndpointsJSON()) |
| } |
| }) |
| } |
| } |
| |
| // Validate that when endpoints of a service flipflop between 1 and 0 does not trigger a full push. |
| func TestEndpointFlipFlops(t *testing.T) { |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{}) |
| addEdsCluster(s, "flipflop.com", "http", "10.0.0.53", 8080) |
| |
| adscConn := s.Connect(nil, nil, watchAll) |
| |
| // Validate that endpoints are pushed correctly. |
| testEndpoints("10.0.0.53", "outbound|8080||flipflop.com", adscConn, t) |
| |
| // Clear the endpoint and validate it does not trigger a full push. |
| s.Discovery.MemRegistry.SetEndpoints("flipflop.com", "", []*model.IstioEndpoint{}) |
| |
| upd, _ := adscConn.Wait(5*time.Second, v3.EndpointType) |
| |
| if contains(upd, "cds") { |
| t.Fatalf("Expecting only EDS update as part of a partial push. But received CDS also %v", upd) |
| } |
| |
| if len(upd) > 0 && !contains(upd, v3.EndpointType) { |
| t.Fatalf("Expecting EDS push as part of a partial push. But received %v", upd) |
| } |
| |
| lbe := adscConn.GetEndpoints()["outbound|8080||flipflop.com"] |
| if len(lbe.Endpoints) != 0 { |
| t.Fatalf("There should be no endpoints for outbound|8080||flipflop.com. Endpoints:\n%v", adscConn.EndpointsJSON()) |
| } |
| |
| // Validate that keys in service still exist in EndpointIndex - this prevents full push. |
| if _, ok := s.Discovery.EndpointIndex.ShardsForService("flipflop.com", ""); !ok { |
| t.Fatalf("Expected service key %s to be present in EndpointIndex. But missing %v", "flipflop.com", s.Discovery.EndpointIndex.Shardz()) |
| } |
| |
| // Set the endpoints again and validate it does not trigger full push. |
| s.Discovery.MemRegistry.SetEndpoints("flipflop.com", "", |
| []*model.IstioEndpoint{ |
| { |
| Address: "10.10.1.1", |
| ServicePortName: "http", |
| EndpointPort: 8080, |
| }, |
| }) |
| |
| upd, _ = adscConn.Wait(5*time.Second, v3.EndpointType) |
| |
| if contains(upd, v3.ClusterType) { |
| t.Fatalf("expecting only EDS update as part of a partial push. But received CDS also %+v", upd) |
| } |
| |
| if len(upd) > 0 && !contains(upd, v3.EndpointType) { |
| t.Fatalf("expecting EDS push as part of a partial push. But did not receive %+v", upd) |
| } |
| |
| testEndpoints("10.10.1.1", "outbound|8080||flipflop.com", adscConn, t) |
| } |
| |
| // Validate that deleting a service clears entries from EndpointIndex. |
| func TestDeleteService(t *testing.T) { |
| s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{}) |
| addEdsCluster(s, "removeservice.com", "http", "10.0.0.53", 8080) |
| |
| adscConn := s.Connect(nil, nil, watchEds) |
| |
| // Validate that endpoints are pushed correctly. |
| testEndpoints("10.0.0.53", "outbound|8080||removeservice.com", adscConn, t) |
| |
| s.Discovery.MemRegistry.RemoveService("removeservice.com") |
| |
| if _, ok := s.Discovery.EndpointIndex.ShardsForService("removeservice.com", ""); ok { |
| t.Fatalf("Expected service key %s to be deleted in EndpointIndex. But is still there %v", |
| "removeservice.com", s.Discovery.EndpointIndex.Shardz()) |
| } |
| } |
| |
| var ( |
| c1Key = model.ShardKey{Cluster: "c1"} |
| c2Key = model.ShardKey{Cluster: "c2"} |
| ) |
| |
| func TestUpdateServiceAccount(t *testing.T) { |
| cluster1Endppoints := []*model.IstioEndpoint{ |
| {Address: "10.172.0.1", ServiceAccount: "sa1"}, |
| {Address: "10.172.0.2", ServiceAccount: "sa-vm1"}, |
| } |
| |
| testCases := []struct { |
| name string |
| shardKey model.ShardKey |
| endpoints []*model.IstioEndpoint |
| expect bool |
| }{ |
| { |
| name: "added new endpoint", |
| shardKey: c1Key, |
| endpoints: append(cluster1Endppoints, &model.IstioEndpoint{Address: "10.172.0.3", ServiceAccount: "sa1"}), |
| expect: false, |
| }, |
| { |
| name: "added new sa", |
| shardKey: c1Key, |
| endpoints: append(cluster1Endppoints, &model.IstioEndpoint{Address: "10.172.0.3", ServiceAccount: "sa2"}), |
| expect: true, |
| }, |
| { |
| name: "updated endpoints address", |
| shardKey: c1Key, |
| endpoints: []*model.IstioEndpoint{ |
| {Address: "10.172.0.5", ServiceAccount: "sa1"}, |
| {Address: "10.172.0.2", ServiceAccount: "sa-vm1"}, |
| }, |
| expect: false, |
| }, |
| { |
| name: "deleted one endpoint with unique sa", |
| shardKey: c1Key, |
| endpoints: []*model.IstioEndpoint{ |
| {Address: "10.172.0.1", ServiceAccount: "sa1"}, |
| }, |
| expect: true, |
| }, |
| { |
| name: "deleted one endpoint with duplicate sa", |
| shardKey: c1Key, |
| endpoints: []*model.IstioEndpoint{ |
| {Address: "10.172.0.2", ServiceAccount: "sa-vm1"}, |
| }, |
| expect: false, |
| }, |
| { |
| name: "deleted endpoints", |
| shardKey: c1Key, |
| endpoints: nil, |
| expect: true, |
| }, |
| } |
| |
| for _, tc := range testCases { |
| t.Run(tc.name, func(t *testing.T) { |
| s := new(xds.DiscoveryServer) |
| originalEndpointsShard := &model.EndpointShards{ |
| Shards: map[model.ShardKey][]*model.IstioEndpoint{ |
| c1Key: cluster1Endppoints, |
| c2Key: {{Address: "10.244.0.1", ServiceAccount: "sa1"}, {Address: "10.244.0.2", ServiceAccount: "sa-vm2"}}, |
| }, |
| ServiceAccounts: map[string]struct{}{ |
| "sa1": {}, |
| "sa-vm1": {}, |
| "sa-vm2": {}, |
| }, |
| } |
| originalEndpointsShard.Shards[tc.shardKey] = tc.endpoints |
| ret := s.UpdateServiceAccount(originalEndpointsShard, "test-svc") |
| if ret != tc.expect { |
| t.Errorf("expect UpdateServiceAccount %v, but got %v", tc.expect, ret) |
| } |
| }) |
| } |
| } |
| |
| func TestZeroEndpointShardSA(t *testing.T) { |
| cluster1Endppoints := []*model.IstioEndpoint{ |
| {Address: "10.172.0.1", ServiceAccount: "sa1"}, |
| } |
| s := new(xds.DiscoveryServer) |
| s.Cache = model.DisabledCache{} |
| s.EndpointIndex = model.NewEndpointIndex() |
| originalEndpointsShard, _ := s.EndpointIndex.GetOrCreateEndpointShard("test", "test") |
| originalEndpointsShard.Shards = map[model.ShardKey][]*model.IstioEndpoint{ |
| c1Key: cluster1Endppoints, |
| } |
| originalEndpointsShard.ServiceAccounts = map[string]struct{}{ |
| "sa1": {}, |
| } |
| s.EDSCacheUpdate(c1Key, "test", "test", []*model.IstioEndpoint{}) |
| modifiedShard, _ := s.EndpointIndex.GetOrCreateEndpointShard("test", "test") |
| if len(modifiedShard.ServiceAccounts) != 0 { |
| t.Errorf("endpoint shard service accounts got %v want 0", len(modifiedShard.ServiceAccounts)) |
| } |
| } |
| |
| func fullPush(s *xds.FakeDiscoveryServer) { |
| s.Discovery.Push(&model.PushRequest{Full: true}) |
| } |
| |
| func addTestClientEndpoints(server *xds.DiscoveryServer) { |
| server.MemRegistry.AddService(&model.Service{ |
| Hostname: "test-1.default", |
| Ports: model.PortList{ |
| { |
| Name: "http", |
| Port: 80, |
| Protocol: protocol.HTTP, |
| }, |
| }, |
| }) |
| server.MemRegistry.AddInstance("test-1.default", &model.ServiceInstance{ |
| Endpoint: &model.IstioEndpoint{ |
| Address: "10.10.10.10", |
| ServicePortName: "http", |
| EndpointPort: 80, |
| Locality: model.Locality{Label: asdcLocality}, |
| }, |
| ServicePort: &model.Port{ |
| Name: "http", |
| Port: 80, |
| Protocol: protocol.HTTP, |
| }, |
| }) |
| server.MemRegistry.AddInstance("test-1.default", &model.ServiceInstance{ |
| Endpoint: &model.IstioEndpoint{ |
| Address: "10.10.10.11", |
| ServicePortName: "http", |
| EndpointPort: 80, |
| Locality: model.Locality{Label: asdc2Locality}, |
| }, |
| ServicePort: &model.Port{ |
| Name: "http", |
| Port: 80, |
| Protocol: protocol.HTTP, |
| }, |
| }) |
| } |
| |
| // Verify server sends the endpoint. This check for a single endpoint with the given |
| // address. |
| func testTCPEndpoints(expected string, adsc *adsc.ADSC, t *testing.T) { |
| t.Helper() |
| testEndpoints(expected, "outbound|8080||eds.test.svc.cluster.local", adsc, t) |
| } |
| |
| // Verify server sends the endpoint. This check for a single endpoint with the given |
| // address. |
| func testEndpoints(expected string, cluster string, adsc *adsc.ADSC, t *testing.T) { |
| t.Helper() |
| lbe, f := adsc.GetEndpoints()[cluster] |
| if !f || len(lbe.Endpoints) == 0 { |
| t.Fatalf("No lb endpoints for %v, %v", cluster, adsc.EndpointsJSON()) |
| } |
| var found []string |
| for _, lbe := range lbe.Endpoints { |
| for _, e := range lbe.LbEndpoints { |
| addr := e.GetEndpoint().Address.GetSocketAddress().Address |
| found = append(found, addr) |
| if expected == addr { |
| return |
| } |
| } |
| } |
| t.Fatalf("Expecting %s got %v", expected, found) |
| } |
| |
| // Verify server sends the tunneled endpoints. |
| // nolint: unparam |
| func testTunnelEndpoints(expectIP string, expectPort uint32, adsc *adsc.ADSC, t *testing.T) { |
| t.Helper() |
| cluster := "outbound|8080||eds.test.svc.cluster.local" |
| allClusters := adsc.GetEndpoints() |
| cla, f := allClusters[cluster] |
| if !f || len(cla.Endpoints) == 0 { |
| t.Fatalf("No lb endpoints for %v, %v", cluster, adsc.EndpointsJSON()) |
| } |
| var found []string |
| for _, lbe := range cla.Endpoints { |
| for _, e := range lbe.LbEndpoints { |
| addr := e.GetEndpoint().Address.GetSocketAddress().Address |
| port := e.GetEndpoint().Address.GetSocketAddress().GetPortValue() |
| found = append(found, fmt.Sprintf("%s:%d", addr, port)) |
| if expectIP == addr && expectPort == port { |
| return |
| } |
| } |
| } |
| t.Errorf("REACH HERE cannot find %s:%d", expectIP, expectPort) |
| t.Fatalf("Expecting address %s:%d got %v", expectIP, expectPort, found) |
| } |
| |
| func testLocalityPrioritizedEndpoints(adsc *adsc.ADSC, adsc2 *adsc.ADSC, t *testing.T) { |
| endpoints1 := adsc.GetEndpoints() |
| endpoints2 := adsc2.GetEndpoints() |
| |
| verifyLocalityPriorities(asdcLocality, endpoints1["outbound|80||locality.cluster.local"].GetEndpoints(), t) |
| verifyLocalityPriorities(asdc2Locality, endpoints2["outbound|80||locality.cluster.local"].GetEndpoints(), t) |
| |
| // No outlier detection specified for this cluster, so we shouldn't apply priority. |
| verifyNoLocalityPriorities(endpoints1["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t) |
| verifyNoLocalityPriorities(endpoints2["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t) |
| } |
| |
| // Tests that Services with multiple ports sharing the same port number are properly sent endpoints. |
| // Real world use case for this is kube-dns, which uses port 53 for TCP and UDP. |
| func testOverlappingPorts(s *xds.FakeDiscoveryServer, adsc *adsc.ADSC, t *testing.T) { |
| // Test initial state |
| testEndpoints("10.0.0.53", "outbound|53||overlapping.cluster.local", adsc, t) |
| |
| s.Discovery.Push(&model.PushRequest{ |
| Full: true, |
| ConfigsUpdated: map[model.ConfigKey]struct{}{{ |
| Kind: gvk.ServiceEntry, |
| Name: "overlapping.cluster.local", |
| }: {}}, |
| }) |
| _, _ = adsc.Wait(5 * time.Second) |
| |
| // After the incremental push, we should still see the endpoint |
| testEndpoints("10.0.0.53", "outbound|53||overlapping.cluster.local", adsc, t) |
| } |
| |
| func verifyNoLocalityPriorities(eps []*endpoint.LocalityLbEndpoints, t *testing.T) { |
| for _, ep := range eps { |
| if ep.GetPriority() != 0 { |
| t.Errorf("expected no locality priorities to apply, got priority %v.", ep.GetPriority()) |
| } |
| } |
| } |
| |
| func verifyLocalityPriorities(proxyLocality string, eps []*endpoint.LocalityLbEndpoints, t *testing.T) { |
| items := strings.SplitN(proxyLocality, "/", 3) |
| region, zone, subzone := items[0], items[1], items[2] |
| for _, ep := range eps { |
| if ep.GetLocality().Region == region { |
| if ep.GetLocality().Zone == zone { |
| if ep.GetLocality().SubZone == subzone { |
| if ep.GetPriority() != 0 { |
| t.Errorf("expected endpoint pool from same locality to have priority of 0, got %v", ep.GetPriority()) |
| } |
| } else if ep.GetPriority() != 1 { |
| t.Errorf("expected endpoint pool from a different subzone to have priority of 1, got %v", ep.GetPriority()) |
| } |
| } else { |
| if ep.GetPriority() != 2 { |
| t.Errorf("expected endpoint pool from a different zone to have priority of 2, got %v", ep.GetPriority()) |
| } |
| } |
| } else { |
| if ep.GetPriority() != 3 { |
| t.Errorf("expected endpoint pool from a different region to have priority of 3, got %v", ep.GetPriority()) |
| } |
| } |
| } |
| } |
| |
| // Verify server sends UDS endpoints |
| func testUdsEndpoints(adsc *adsc.ADSC, t *testing.T) { |
| // Check the UDS endpoint ( used to be separate test - but using old unused GRPC method) |
| // The new test also verifies CDS is pusing the UDS cluster, since adsc.eds is |
| // populated using CDS response |
| lbe, f := adsc.GetEndpoints()["outbound|0||localuds.cluster.local"] |
| if !f || len(lbe.Endpoints) == 0 { |
| t.Error("No UDS lb endpoints") |
| } else { |
| ep0 := lbe.Endpoints[0] |
| if len(ep0.LbEndpoints) != 1 { |
| t.Fatalf("expected 1 LB endpoint but got %d", len(ep0.LbEndpoints)) |
| } |
| lbep := ep0.LbEndpoints[0] |
| path := lbep.GetEndpoint().GetAddress().GetPipe().GetPath() |
| if path != udsPath { |
| t.Fatalf("expected Pipe to %s, got %s", udsPath, path) |
| } |
| } |
| } |
| |
| // Update |
| func edsUpdates(s *xds.FakeDiscoveryServer, adsc *adsc.ADSC, t *testing.T) { |
| // Old style (non-incremental) |
| s.Discovery.MemRegistry.SetEndpoints(edsIncSvc, "", |
| newEndpointWithAccount("127.0.0.3", "hello-sa", "v1")) |
| |
| xds.AdsPushAll(s.Discovery) |
| |
| // will trigger recompute and push |
| |
| if _, err := adsc.Wait(5*time.Second, v3.EndpointType); err != nil { |
| t.Fatal("EDS push failed", err) |
| } |
| testTCPEndpoints("127.0.0.3", adsc, t) |
| } |
| |
| // edsFullUpdateCheck checks for updates required in a full push after the CDS update |
| func edsFullUpdateCheck(adsc *adsc.ADSC, t *testing.T) { |
| t.Helper() |
| if upd, err := adsc.Wait(15*time.Second, watchAll...); err != nil { |
| t.Fatal("Expecting CDS, EDS, LDS, and RDS update as part of a full push", err, upd) |
| } |
| } |
| |
| // This test must be run in isolation, can't be parallelized with any other v2 test. |
| // It makes different kind of updates, and checks that incremental or full push happens. |
| // In particular: |
| // - just endpoint changes -> incremental |
| // - service account changes -> full ( in future: CDS only ) |
| // - label changes -> full |
| func edsUpdateInc(s *xds.FakeDiscoveryServer, adsc *adsc.ADSC, t *testing.T) { |
| // TODO: set endpoints for a different cluster (new shard) |
| |
| // Verify initial state |
| testTCPEndpoints("127.0.0.1", adsc, t) |
| |
| adsc.WaitClear() // make sure there are no pending pushes. |
| |
| // Equivalent with the event generated by K8S watching the Service. |
| // Will trigger a push. |
| s.Discovery.MemRegistry.SetEndpoints(edsIncSvc, "", |
| newEndpointWithAccount("127.0.0.2", "hello-sa", "v1")) |
| |
| upd, err := adsc.Wait(5*time.Second, v3.EndpointType) |
| if err != nil { |
| t.Fatal("Incremental push failed", err) |
| } |
| if contains(upd, v3.ClusterType) { |
| t.Fatal("Expecting EDS only update, got", upd) |
| } |
| |
| testTCPEndpoints("127.0.0.2", adsc, t) |
| |
| // Update the endpoint with different SA - expect full |
| s.Discovery.MemRegistry.SetEndpoints(edsIncSvc, "", |
| newEndpointWithAccount("127.0.0.2", "account2", "v1")) |
| |
| edsFullUpdateCheck(adsc, t) |
| testTCPEndpoints("127.0.0.2", adsc, t) |
| |
| // Update the endpoint again, no SA change - expect incremental |
| s.Discovery.MemRegistry.SetEndpoints(edsIncSvc, "", |
| newEndpointWithAccount("127.0.0.4", "account2", "v1")) |
| |
| upd, err = adsc.Wait(5 * time.Second) |
| if err != nil { |
| t.Fatal("Incremental push failed", err) |
| } |
| if !reflect.DeepEqual(upd, []string{v3.EndpointType}) { |
| t.Fatal("Expecting EDS only update, got", upd) |
| } |
| testTCPEndpoints("127.0.0.4", adsc, t) |
| |
| // Update the endpoint to original SA - expect full |
| s.Discovery.MemRegistry.SetEndpoints(edsIncSvc, "", |
| newEndpointWithAccount("127.0.0.2", "hello-sa", "v1")) |
| edsFullUpdateCheck(adsc, t) |
| testTCPEndpoints("127.0.0.2", adsc, t) |
| |
| // Update the endpoint again, no label change - expect incremental |
| s.Discovery.MemRegistry.SetEndpoints(edsIncSvc, "", |
| newEndpointWithAccount("127.0.0.5", "hello-sa", "v1")) |
| |
| upd, err = adsc.Wait(5 * time.Second) |
| if err != nil { |
| t.Fatal("Incremental push failed", err) |
| } |
| if !reflect.DeepEqual(upd, []string{v3.EndpointType}) { |
| t.Fatal("Expecting EDS only update, got", upd) |
| } |
| testTCPEndpoints("127.0.0.5", adsc, t) |
| |
| // Wipe out all endpoints - expect full |
| s.Discovery.MemRegistry.SetEndpoints(edsIncSvc, "", []*model.IstioEndpoint{}) |
| |
| if upd, err := adsc.Wait(15*time.Second, v3.EndpointType); err != nil { |
| t.Fatal("Expecting EDS update as part of a partial push", err, upd) |
| } |
| |
| lbe := adsc.GetEndpoints()["outbound|8080||eds.test.svc.cluster.local"] |
| if len(lbe.Endpoints) != 0 { |
| t.Fatalf("There should be no endpoints for outbound|8080||eds.test.svc.cluster.local. Endpoints:\n%v", adsc.EndpointsJSON()) |
| } |
| } |
| |
| // Make a direct EDS grpc request to pilot, verify the result is as expected. |
| // This test includes a 'bad client' regression test, which fails to read on the |
| // stream. |
| func multipleRequest(s *xds.FakeDiscoveryServer, inc bool, nclients, |
| nPushes int, to time.Duration, _ map[string]string, t *testing.T, |
| ) { |
| wgConnect := &sync.WaitGroup{} |
| wg := &sync.WaitGroup{} |
| errChan := make(chan error, nclients) |
| |
| // Bad client - will not read any response. This triggers Write to block, which should |
| // be detected |
| // This is not using adsc, which consumes the events automatically. |
| ads := s.ConnectADS() |
| ads.Request(t, nil) |
| |
| n := nclients |
| wg.Add(n) |
| wgConnect.Add(n) |
| rcvPush := uatomic.NewInt32(0) |
| rcvClients := uatomic.NewInt32(0) |
| for i := 0; i < n; i++ { |
| current := i |
| go func(id int) { |
| defer wg.Done() |
| // Connect and get initial response |
| adscConn := s.Connect(&model.Proxy{IPAddresses: []string{fmt.Sprintf("1.1.1.%d", id)}}, nil, nil) |
| _, err := adscConn.Wait(15*time.Second, v3.RouteType) |
| if err != nil { |
| errChan <- errors.New("failed to get initial rds: " + err.Error()) |
| wgConnect.Done() |
| return |
| } |
| |
| if len(adscConn.GetEndpoints()) == 0 { |
| errChan <- errors.New("no endpoints") |
| wgConnect.Done() |
| return |
| } |
| |
| wgConnect.Done() |
| |
| // Check we received all pushes |
| log.Info("Waiting for pushes ", id) |
| |
| // Pushes may be merged so we may not get nPushes pushes |
| got, err := adscConn.Wait(15*time.Second, v3.EndpointType) |
| |
| // If in incremental mode, shouldn't receive cds|rds|lds here |
| if inc { |
| for _, g := range got { |
| if g == "cds" || g == "rds" || g == "lds" { |
| errChan <- fmt.Errorf("should be eds incremental but received cds. %v %v", |
| err, id) |
| return |
| } |
| } |
| } |
| |
| rcvPush.Inc() |
| if err != nil { |
| log.Info("Recv failed", err, id) |
| errChan <- fmt.Errorf("failed to receive a response in 15 s %v %v", |
| err, id) |
| return |
| } |
| |
| log.Info("Received all pushes ", id) |
| rcvClients.Inc() |
| |
| adscConn.Close() |
| }(current) |
| } |
| ok := waitTimeout(wgConnect, to) |
| if !ok { |
| t.Fatal("Failed to connect") |
| } |
| log.Info("Done connecting") |
| |
| // All clients are connected - this can start pushing changes. |
| for j := 0; j < nPushes; j++ { |
| if inc { |
| // This will be throttled - we want to trigger a single push |
| s.Discovery.AdsPushAll(strconv.Itoa(j), &model.PushRequest{ |
| Full: false, |
| ConfigsUpdated: map[model.ConfigKey]struct{}{{ |
| Kind: gvk.ServiceEntry, |
| Name: edsIncSvc, |
| }: {}}, |
| Push: s.Discovery.Env.PushContext, |
| }) |
| } else { |
| xds.AdsPushAll(s.Discovery) |
| } |
| log.Info("Push done ", j) |
| } |
| |
| ok = waitTimeout(wg, to) |
| if !ok { |
| t.Errorf("Failed to receive all responses %d %d", rcvClients.Load(), rcvPush.Load()) |
| buf := make([]byte, 1<<16) |
| runtime.Stack(buf, true) |
| fmt.Printf("%s", buf) |
| } |
| |
| close(errChan) |
| |
| // moved from ads_test, which had a duplicated test. |
| for e := range errChan { |
| t.Error(e) |
| } |
| } |
| |
| func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { |
| c := make(chan struct{}) |
| go func() { |
| defer close(c) |
| wg.Wait() |
| }() |
| select { |
| case <-c: |
| return true |
| case <-time.After(timeout): |
| return false |
| } |
| } |
| |
| const udsPath = "/var/run/test/socket" |
| |
| func addUdsEndpoint(s *xds.DiscoveryServer) { |
| s.MemRegistry.AddService(&model.Service{ |
| Hostname: "localuds.cluster.local", |
| Ports: model.PortList{ |
| { |
| Name: "grpc", |
| Port: 0, |
| Protocol: protocol.GRPC, |
| }, |
| }, |
| MeshExternal: true, |
| Resolution: model.ClientSideLB, |
| }) |
| s.MemRegistry.AddInstance("localuds.cluster.local", &model.ServiceInstance{ |
| Endpoint: &model.IstioEndpoint{ |
| Address: udsPath, |
| EndpointPort: 0, |
| ServicePortName: "grpc", |
| Locality: model.Locality{Label: "localhost"}, |
| Labels: map[string]string{"socket": "unix"}, |
| }, |
| ServicePort: &model.Port{ |
| Name: "grpc", |
| Port: 0, |
| Protocol: protocol.GRPC, |
| }, |
| }) |
| |
| pushReq := &model.PushRequest{ |
| Full: true, |
| Reason: []model.TriggerReason{model.ConfigUpdate}, |
| } |
| s.ConfigUpdate(pushReq) |
| } |
| |
| func addLocalityEndpoints(server *xds.DiscoveryServer, hostname host.Name) { |
| server.MemRegistry.AddService(&model.Service{ |
| Hostname: hostname, |
| Ports: model.PortList{ |
| { |
| Name: "http", |
| Port: 80, |
| Protocol: protocol.HTTP, |
| }, |
| }, |
| }) |
| localities := []string{ |
| "region1/zone1/subzone1", |
| "region1/zone1/subzone2", |
| "region1/zone2/subzone1", |
| "region2/zone1/subzone1", |
| "region2/zone1/subzone2", |
| "region2/zone2/subzone1", |
| "region2/zone2/subzone2", |
| } |
| for i, locality := range localities { |
| server.MemRegistry.AddInstance(hostname, &model.ServiceInstance{ |
| Endpoint: &model.IstioEndpoint{ |
| Address: fmt.Sprintf("10.0.0.%v", i), |
| EndpointPort: 80, |
| ServicePortName: "http", |
| Locality: model.Locality{Label: locality}, |
| }, |
| ServicePort: &model.Port{ |
| Name: "http", |
| Port: 80, |
| Protocol: protocol.HTTP, |
| }, |
| }) |
| } |
| } |
| |
| // nolint: unparam |
| func addEdsCluster(s *xds.FakeDiscoveryServer, hostName string, portName string, address string, port int) { |
| s.Discovery.MemRegistry.AddService(&model.Service{ |
| Hostname: host.Name(hostName), |
| Ports: model.PortList{ |
| { |
| Name: portName, |
| Port: port, |
| Protocol: protocol.HTTP, |
| }, |
| }, |
| }) |
| |
| s.Discovery.MemRegistry.AddInstance(host.Name(hostName), &model.ServiceInstance{ |
| Endpoint: &model.IstioEndpoint{ |
| Address: address, |
| EndpointPort: uint32(port), |
| ServicePortName: portName, |
| }, |
| ServicePort: &model.Port{ |
| Name: portName, |
| Port: port, |
| Protocol: protocol.HTTP, |
| }, |
| }) |
| fullPush(s) |
| } |
| |
| func updateServiceResolution(s *xds.FakeDiscoveryServer, resolution model.Resolution) { |
| s.Discovery.MemRegistry.AddService(&model.Service{ |
| Hostname: "edsdns.svc.cluster.local", |
| Ports: model.PortList{ |
| { |
| Name: "http", |
| Port: 8080, |
| Protocol: protocol.HTTP, |
| }, |
| }, |
| Resolution: resolution, |
| }) |
| |
| s.Discovery.MemRegistry.AddInstance("edsdns.svc.cluster.local", &model.ServiceInstance{ |
| Endpoint: &model.IstioEndpoint{ |
| Address: "somevip.com", |
| EndpointPort: 8080, |
| ServicePortName: "http", |
| }, |
| ServicePort: &model.Port{ |
| Name: "http", |
| Port: 8080, |
| Protocol: protocol.HTTP, |
| }, |
| }) |
| |
| fullPush(s) |
| } |
| |
| func addOverlappingEndpoints(s *xds.FakeDiscoveryServer) { |
| s.Discovery.MemRegistry.AddService(&model.Service{ |
| Hostname: "overlapping.cluster.local", |
| Ports: model.PortList{ |
| { |
| Name: "dns", |
| Port: 53, |
| Protocol: protocol.UDP, |
| }, |
| { |
| Name: "tcp-dns", |
| Port: 53, |
| Protocol: protocol.TCP, |
| }, |
| }, |
| }) |
| s.Discovery.MemRegistry.AddInstance("overlapping.cluster.local", &model.ServiceInstance{ |
| Endpoint: &model.IstioEndpoint{ |
| Address: "10.0.0.53", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| }, |
| ServicePort: &model.Port{ |
| Name: "tcp-dns", |
| Port: 53, |
| Protocol: protocol.TCP, |
| }, |
| }) |
| fullPush(s) |
| } |
| |
| func addUnhealthyCluster(s *xds.FakeDiscoveryServer) { |
| s.Discovery.MemRegistry.AddService(&model.Service{ |
| Hostname: "unhealthy.svc.cluster.local", |
| Ports: model.PortList{ |
| { |
| Name: "tcp-dns", |
| Port: 53, |
| Protocol: protocol.TCP, |
| }, |
| }, |
| }) |
| s.Discovery.MemRegistry.AddInstance("unhealthy.svc.cluster.local", &model.ServiceInstance{ |
| Endpoint: &model.IstioEndpoint{ |
| Address: "10.0.0.53", |
| EndpointPort: 53, |
| ServicePortName: "tcp-dns", |
| HealthStatus: model.UnHealthy, |
| }, |
| ServicePort: &model.Port{ |
| Name: "tcp-dns", |
| Port: 53, |
| Protocol: protocol.TCP, |
| }, |
| }) |
| fullPush(s) |
| } |
| |
| // Verify the endpoint debug interface is installed and returns some string. |
| // TODO: parse response, check if data captured matches what we expect. |
| // TODO: use this in integration tests. |
| // TODO: refine the output |
| // TODO: dump the ServiceInstances as well |
| func testEdsz(t *testing.T, s *xds.FakeDiscoveryServer, proxyID string) { |
| req, err := http.NewRequest("GET", "/debug/edsz?proxyID="+proxyID, nil) |
| if err != nil { |
| t.Fatal(err) |
| } |
| rr := httptest.NewRecorder() |
| debug := http.HandlerFunc(s.Discovery.Edsz) |
| debug.ServeHTTP(rr, req) |
| |
| data, err := io.ReadAll(rr.Body) |
| if err != nil { |
| t.Fatalf("Failed to read /edsz") |
| } |
| statusStr := string(data) |
| |
| if !strings.Contains(statusStr, "\"outbound|8080||eds.test.svc.cluster.local\"") { |
| t.Fatal("Mock eds service not found ", statusStr) |
| } |
| } |
| |
| func contains(s []string, e string) bool { |
| for _, a := range s { |
| if a == e { |
| return true |
| } |
| } |
| return false |
| } |