blob: 47311f65e306f126ed4d31704006f1a99b3b0d30 [file] [log] [blame]
// Copyright 2020 Istio Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package adsc
import (
import (
cluster ""
endpoint ""
listener ""
route ""
xdsapi ""
any ""
mcp ""
networking ""
import (
type testAdscRunServer struct{}
var StreamHandler func(stream xdsapi.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
func (t *testAdscRunServer) StreamAggregatedResources(stream xdsapi.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
return StreamHandler(stream)
func (t *testAdscRunServer) DeltaAggregatedResources(xdsapi.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return nil
func TestADSC_Run(t *testing.T) {
type testCase struct {
desc string
inAdsc *ADSC
streamHandler func(server xdsapi.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
expectedADSResources *ADSC
validator func(testCase) error
var tests []testCase
type testDesc struct {
desc string
reqTypeUrls []string
expectedTypeUrls []string // nil means equals to requested
validator func(testCase) error
descs := []testDesc{
desc: "stream-no-resources",
reqTypeUrls: []string{},
desc: "stream-2-unnamed-resources",
reqTypeUrls: []string{"foo", "bar"},
// todo tests for listeners, clusters, eds, and routes, not sure how to do this.
initTypeUrls := func() []string {
var ret []string
for _, req := range ConfigInitialRequests() {
ret = append(ret, req.TypeUrl)
return ret
incompleteTypeUrls := func() []string {
var ret []string
for idx, item := range initTypeUrls {
if strings.Count(item, "/") == 3 {
ret = append(ret, initTypeUrls[:idx]...)
ret = append(ret, initTypeUrls[idx+1:]...)
if ret == nil {
ret = initTypeUrls
return ret
descs = append(descs, testDesc{
desc: "mcp-should-hasSynced",
reqTypeUrls: initTypeUrls,
validator: func(tc testCase) error {
if !tc.inAdsc.HasSynced() {
return fmt.Errorf("adsc not synced")
return nil
if len(incompleteTypeUrls) != len(initTypeUrls) {
descs = append(descs, testDesc{
desc: "mcp-should-not-hasSynced",
reqTypeUrls: initTypeUrls,
expectedTypeUrls: incompleteTypeUrls,
validator: func(tc testCase) error {
if tc.inAdsc.HasSynced() {
return fmt.Errorf("adsc synced but should not")
return nil
for _, item := range descs {
desc := item // avoid refer to on-stack-var
expected := map[string]*xdsapi.DiscoveryResponse{}
if desc.expectedTypeUrls == nil {
desc.expectedTypeUrls = desc.reqTypeUrls
var initReqs []*xdsapi.DiscoveryRequest
for _, typeURL := range desc.reqTypeUrls {
initReqs = append(initReqs, &xdsapi.DiscoveryRequest{TypeUrl: typeURL})
for _, typeURL := range desc.expectedTypeUrls {
expected[typeURL] = &xdsapi.DiscoveryResponse{TypeUrl: typeURL}
if desc.validator == nil {
desc.validator = func(tc testCase) error {
if !cmp.Equal(tc.inAdsc.Received, tc.expectedADSResources.Received, protocmp.Transform()) {
return fmt.Errorf("%s: expected recv %v got %v", tc.desc, tc.expectedADSResources.Received, tc.inAdsc.Received)
return nil
tc := testCase{
desc: desc.desc,
inAdsc: &ADSC{
Received: make(map[string]*xdsapi.DiscoveryResponse),
Updates: make(chan string),
XDSUpdates: make(chan *xdsapi.DiscoveryResponse),
RecvWg: sync.WaitGroup{},
cfg: &Config{
InitialDiscoveryRequests: initReqs,
VersionInfo: map[string]string{},
sync: map[string]time.Time{},
streamHandler: func(stream xdsapi.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
for _, typeURL := range desc.expectedTypeUrls {
_ = stream.Send(&xdsapi.DiscoveryResponse{
TypeUrl: typeURL,
return nil
expectedADSResources: &ADSC{
Received: expected,
validator: desc.validator,
tests = append(tests, tc)
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
StreamHandler = tt.streamHandler
l, err := net.Listen("tcp", ":0")
if err != nil {
t.Errorf("Unable to listen with tcp err %v", err)
tt.inAdsc.url = l.Addr().String()
xds := grpc.NewServer()
xdsapi.RegisterAggregatedDiscoveryServiceServer(xds, new(testAdscRunServer))
go func() {
err = xds.Serve(l)
if err != nil {
defer xds.GracefulStop()
if err != nil {
t.Errorf("Could not start serving ads server %v", err)
if err := tt.inAdsc.Dial(); err != nil {
t.Errorf("Dial error: %v", err)
if err := tt.inAdsc.Run(); err != nil {
t.Errorf("ADSC: failed running %v", err)
if err := tt.validator(tt); err != nil {
func TestADSC_Save(t *testing.T) {
tests := []struct {
desc string
expectedJSON map[string]string
adsc *ADSC
err error
desc: "empty",
expectedJSON: map[string]string{
"_lds_tcp": `[]`,
"_lds_http": `[]`,
"_rds": `[]`,
"_eds": `[]`,
"_ecds": `[]`,
"_cds": `[]`,
err: nil,
adsc: &ADSC{
tcpListeners: map[string]*listener.Listener{},
httpListeners: map[string]*listener.Listener{},
routes: map[string]*route.RouteConfiguration{},
edsClusters: map[string]*cluster.Cluster{},
clusters: map[string]*cluster.Cluster{},
eds: map[string]*endpoint.ClusterLoadAssignment{},
desc: "populated",
err: nil,
expectedJSON: map[string]string{
"_lds_tcp": `[
"listener-1": {
"name": "bar"
"listener-2": {
"name": "mar"
"_lds_http": `[
"http-list-1": {
"name": "bar"
"http-list-2": {
"name": "mar"
"_rds": `[
"route-1": {
"name": "mar"
"_eds": `[
"load-assignment-1": {
"clusterName": "foo"
"_ecds": `[
"eds-cluster-1": {
"name": "test"
"_cds": `[
"cluster-1": {
"name": "foo"
adsc: &ADSC{
tcpListeners: map[string]*listener.Listener{
"listener-1": {
Name: "bar",
"listener-2": {
Name: "mar",
httpListeners: map[string]*listener.Listener{
"http-list-1": {
Name: "bar",
"http-list-2": {
Name: "mar",
routes: map[string]*route.RouteConfiguration{
"route-1": {
Name: "mar",
edsClusters: map[string]*cluster.Cluster{
"eds-cluster-1": {
Name: "test",
clusters: map[string]*cluster.Cluster{
"cluster-1": {
Name: "foo",
eds: map[string]*endpoint.ClusterLoadAssignment{
"load-assignment-1": {
ClusterName: "foo",
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
base := t.TempDir()
if err := tt.adsc.Save(base); (err == nil && tt.err != nil) || (err != nil && tt.err == nil) {
t.Errorf("AdscSave() => %v expected err %v", err, tt.err)
if ldsTCP := readFile(base+"_lds_tcp.json", t); ldsTCP != tt.expectedJSON["_lds_tcp"] {
t.Errorf("AdscSave() => %s expected ldsTcp %s\n%v", ldsTCP, tt.expectedJSON["_lds_tcp"], cmp.Diff(ldsTCP, tt.expectedJSON["_lds_tcp"]))
if ldsHTTP := readFile(base+"_lds_http.json", t); ldsHTTP != tt.expectedJSON["_lds_http"] {
t.Errorf("AdscSave() => %s expected ldsHttp %s", ldsHTTP, tt.expectedJSON["_lds_http"])
if rds := readFile(base+"_rds.json", t); rds != tt.expectedJSON["_rds"] {
t.Errorf("AdscSave() => %s expected rds %s", rds, tt.expectedJSON["_rds"])
if ecds := readFile(base+"_ecds.json", t); ecds != tt.expectedJSON["_ecds"] {
t.Errorf("AdscSave() => %s expected ecds %s", ecds, tt.expectedJSON["_ecds"])
if cds := readFile(base+"_cds.json", t); cds != tt.expectedJSON["_cds"] {
t.Errorf("AdscSave() => %s expected cds %s", cds, tt.expectedJSON["_cds"])
if eds := readFile(base+"_eds.json", t); eds != tt.expectedJSON["_eds"] {
t.Errorf("AdscSave() => %s expected eds %s", eds, tt.expectedJSON["_eds"])
saveTeardown(base, t)
func saveTeardown(base string, t *testing.T) {
if err := os.Remove(base + "_lds_tcp.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
if err := os.Remove(base + "_lds_http.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
if err := os.Remove(base + "_cds.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
if err := os.Remove(base + "_rds.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
if err := os.Remove(base + "_ecds.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
if err := os.Remove(base + "_eds.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
func readFile(dir string, t *testing.T) string {
dat, err := os.ReadFile(dir)
if err != nil {
t.Fatalf("file %s issue: %v", dat, err)
return string(dat)
func TestADSC_handleMCP(t *testing.T) {
rev := "test-rev"
adsc := &ADSC{
VersionInfo: map[string]string{},
Store: model.MakeIstioStore(memory.Make(collections.Pilot)),
cfg: &Config{Revision: rev},
patchLabel := func(lbls map[string]string, name, value string) map[string]string {
if lbls == nil {
lbls = map[string]string{}
lbls[name] = value
return lbls
tests := []struct {
desc string
resources []*any.Any
expectedResources [][]string
desc: "create-resources",
resources: []*any.Any{
constructResource("foo1", "", "", "1"),
constructResource("foo2", "", "", "1"),
expectedResources: [][]string{
{"foo1", "", ""},
{"foo2", "", ""},
desc: "create-resources-rev-1",
resources: []*any.Any{
constructResource("foo1", "", "", "1"),
constructResourceWithOptions("foo2", "", "", "1", func(resource *mcp.Resource) {
resource.Metadata.Labels = patchLabel(resource.Metadata.Labels, label.IoIstioRev.Name, rev+"wrong") // to del
constructResourceWithOptions("foo3", "", "", "1", func(resource *mcp.Resource) {
resource.Metadata.Labels = patchLabel(resource.Metadata.Labels, label.IoIstioRev.Name, rev) // to add
expectedResources: [][]string{
{"foo1", "", ""},
{"foo3", "", ""},
desc: "create-resources-rev-2",
resources: []*any.Any{
constructResource("foo1", "", "", "1"),
constructResourceWithOptions("foo2", "", "", "1", func(resource *mcp.Resource) {
resource.Metadata.Labels = patchLabel(resource.Metadata.Labels, label.IoIstioRev.Name, rev) // to add back
constructResourceWithOptions("foo3", "", "", "1", func(resource *mcp.Resource) {
resource.Metadata.Labels = patchLabel(resource.Metadata.Labels, label.IoIstioRev.Name, rev+"wrong") // to del
expectedResources: [][]string{
{"foo1", "", ""},
{"foo2", "", ""},
desc: "update-and-create-resources",
resources: []*any.Any{
constructResource("foo1", "", "", "2"),
constructResource("foo2", "", "", "1"),
constructResource("foo3", "", "", ""),
expectedResources: [][]string{
{"foo1", "", ""},
{"foo2", "", ""},
{"foo3", "", ""},
desc: "update-delete-and-create-resources",
resources: []*any.Any{
constructResource("foo2", "", "", "4"),
constructResource("foo4", "", "", "1"),
expectedResources: [][]string{
{"foo2", "", ""},
{"foo4", "", ""},
desc: "update-and-delete-resources",
resources: []*any.Any{
constructResource("foo2", "", "", "3"),
constructResource("foo3", "", "", ""),
expectedResources: [][]string{
{"foo2", "", ""},
{"foo3", "", ""},
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
gvk := []string{"", "v1alpha3", "ServiceEntry"}
adsc.handleMCP(gvk, tt.resources)
configs, _ := adsc.Store.List(collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(), "")
if len(configs) != len(tt.expectedResources) {
t.Errorf("expected %v got %v", len(tt.expectedResources), len(configs))
configMap := make(map[string][]string)
for _, conf := range configs {
service, _ := conf.Spec.(*networking.ServiceEntry)
configMap[conf.Name] = []string{conf.Name, service.Hosts[0], service.Addresses[0]}
for _, expected := range tt.expectedResources {
got, ok := configMap[expected[0]]
if !ok {
t.Errorf("expected %v got none", expected)
} else {
for i, value := range expected {
if value != got[i] {
t.Errorf("expected %v got %v", value, got[i])
func constructResourceWithOptions(name string, host string, address, version string, options ...func(resource *mcp.Resource)) *any.Any {
service := &networking.ServiceEntry{
Hosts: []string{host},
Addresses: []string{address},
seAny, _ := any.New(service)
resource := &mcp.Resource{
Metadata: &mcp.Metadata{
Name: "default/" + name,
CreateTime: timestamppb.Now(),
Version: version,
Body: seAny,
for _, o := range options {
resAny, _ := any.New(resource)
return &any.Any{
TypeUrl: resAny.TypeUrl,
Value: resAny.Value,
func constructResource(name string, host string, address, version string) *any.Any {
return constructResourceWithOptions(name, host, address, version)