blob: 47311f65e306f126ed4d31704006f1a99b3b0d30 [file] [log] [blame]
// Copyright 2020 Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package adsc
import (
"fmt"
"log"
"net"
"os"
"strings"
"sync"
"testing"
"time"
)
import (
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
xdsapi "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/protobuf/testing/protocmp"
any "google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
"istio.io/api/label"
mcp "istio.io/api/mcp/v1alpha1"
networking "istio.io/api/networking/v1alpha3"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
)
type testAdscRunServer struct{}
var StreamHandler func(stream xdsapi.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
func (t *testAdscRunServer) StreamAggregatedResources(stream xdsapi.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
return StreamHandler(stream)
}
func (t *testAdscRunServer) DeltaAggregatedResources(xdsapi.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return nil
}
func TestADSC_Run(t *testing.T) {
type testCase struct {
desc string
inAdsc *ADSC
streamHandler func(server xdsapi.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
expectedADSResources *ADSC
validator func(testCase) error
}
var tests []testCase
type testDesc struct {
desc string
reqTypeUrls []string
expectedTypeUrls []string // nil means equals to requested
validator func(testCase) error
}
descs := []testDesc{
{
desc: "stream-no-resources",
reqTypeUrls: []string{},
},
{
desc: "stream-2-unnamed-resources",
reqTypeUrls: []string{"foo", "bar"},
},
// todo tests for listeners, clusters, eds, and routes, not sure how to do this.
}
initTypeUrls := func() []string {
var ret []string
for _, req := range ConfigInitialRequests() {
ret = append(ret, req.TypeUrl)
}
return ret
}()
incompleteTypeUrls := func() []string {
var ret []string
for idx, item := range initTypeUrls {
if strings.Count(item, "/") == 3 {
ret = append(ret, initTypeUrls[:idx]...)
ret = append(ret, initTypeUrls[idx+1:]...)
break
}
}
if ret == nil {
ret = initTypeUrls
}
return ret
}()
descs = append(descs, testDesc{
desc: "mcp-should-hasSynced",
reqTypeUrls: initTypeUrls,
validator: func(tc testCase) error {
if !tc.inAdsc.HasSynced() {
return fmt.Errorf("adsc not synced")
}
return nil
},
})
if len(incompleteTypeUrls) != len(initTypeUrls) {
descs = append(descs, testDesc{
desc: "mcp-should-not-hasSynced",
reqTypeUrls: initTypeUrls,
expectedTypeUrls: incompleteTypeUrls,
validator: func(tc testCase) error {
if tc.inAdsc.HasSynced() {
return fmt.Errorf("adsc synced but should not")
}
return nil
},
})
}
for _, item := range descs {
desc := item // avoid refer to on-stack-var
expected := map[string]*xdsapi.DiscoveryResponse{}
if desc.expectedTypeUrls == nil {
desc.expectedTypeUrls = desc.reqTypeUrls
}
var initReqs []*xdsapi.DiscoveryRequest
for _, typeURL := range desc.reqTypeUrls {
initReqs = append(initReqs, &xdsapi.DiscoveryRequest{TypeUrl: typeURL})
}
for _, typeURL := range desc.expectedTypeUrls {
expected[typeURL] = &xdsapi.DiscoveryResponse{TypeUrl: typeURL}
}
if desc.validator == nil {
desc.validator = func(tc testCase) error {
if !cmp.Equal(tc.inAdsc.Received, tc.expectedADSResources.Received, protocmp.Transform()) {
return fmt.Errorf("%s: expected recv %v got %v", tc.desc, tc.expectedADSResources.Received, tc.inAdsc.Received)
}
return nil
}
}
tc := testCase{
desc: desc.desc,
inAdsc: &ADSC{
Received: make(map[string]*xdsapi.DiscoveryResponse),
Updates: make(chan string),
XDSUpdates: make(chan *xdsapi.DiscoveryResponse),
RecvWg: sync.WaitGroup{},
cfg: &Config{
InitialDiscoveryRequests: initReqs,
},
VersionInfo: map[string]string{},
sync: map[string]time.Time{},
},
streamHandler: func(stream xdsapi.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
for _, typeURL := range desc.expectedTypeUrls {
_ = stream.Send(&xdsapi.DiscoveryResponse{
TypeUrl: typeURL,
})
}
return nil
},
expectedADSResources: &ADSC{
Received: expected,
},
validator: desc.validator,
}
tests = append(tests, tc)
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
StreamHandler = tt.streamHandler
l, err := net.Listen("tcp", ":0")
if err != nil {
t.Errorf("Unable to listen with tcp err %v", err)
return
}
tt.inAdsc.url = l.Addr().String()
xds := grpc.NewServer()
xdsapi.RegisterAggregatedDiscoveryServiceServer(xds, new(testAdscRunServer))
go func() {
err = xds.Serve(l)
if err != nil {
log.Println(err)
}
}()
defer xds.GracefulStop()
if err != nil {
t.Errorf("Could not start serving ads server %v", err)
return
}
if err := tt.inAdsc.Dial(); err != nil {
t.Errorf("Dial error: %v", err)
return
}
if err := tt.inAdsc.Run(); err != nil {
t.Errorf("ADSC: failed running %v", err)
return
}
tt.inAdsc.RecvWg.Wait()
if err := tt.validator(tt); err != nil {
t.Error(err)
}
})
}
}
func TestADSC_Save(t *testing.T) {
tests := []struct {
desc string
expectedJSON map[string]string
adsc *ADSC
err error
}{
{
desc: "empty",
expectedJSON: map[string]string{
"_lds_tcp": `[]`,
"_lds_http": `[]`,
"_rds": `[]`,
"_eds": `[]`,
"_ecds": `[]`,
"_cds": `[]`,
},
err: nil,
adsc: &ADSC{
tcpListeners: map[string]*listener.Listener{},
httpListeners: map[string]*listener.Listener{},
routes: map[string]*route.RouteConfiguration{},
edsClusters: map[string]*cluster.Cluster{},
clusters: map[string]*cluster.Cluster{},
eds: map[string]*endpoint.ClusterLoadAssignment{},
},
},
{
desc: "populated",
err: nil,
expectedJSON: map[string]string{
"_lds_tcp": `[
{
"listener-1": {
"name": "bar"
}
},
{
"listener-2": {
"name": "mar"
}
}
]`,
"_lds_http": `[
{
"http-list-1": {
"name": "bar"
}
},
{
"http-list-2": {
"name": "mar"
}
}
]`,
"_rds": `[
{
"route-1": {
"name": "mar"
}
}
]`,
"_eds": `[
{
"load-assignment-1": {
"clusterName": "foo"
}
}
]`,
"_ecds": `[
{
"eds-cluster-1": {
"name": "test"
}
}
]`,
"_cds": `[
{
"cluster-1": {
"name": "foo"
}
}
]`,
},
adsc: &ADSC{
tcpListeners: map[string]*listener.Listener{
"listener-1": {
Name: "bar",
},
"listener-2": {
Name: "mar",
},
},
httpListeners: map[string]*listener.Listener{
"http-list-1": {
Name: "bar",
},
"http-list-2": {
Name: "mar",
},
},
routes: map[string]*route.RouteConfiguration{
"route-1": {
Name: "mar",
},
},
edsClusters: map[string]*cluster.Cluster{
"eds-cluster-1": {
Name: "test",
},
},
clusters: map[string]*cluster.Cluster{
"cluster-1": {
Name: "foo",
},
},
eds: map[string]*endpoint.ClusterLoadAssignment{
"load-assignment-1": {
ClusterName: "foo",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
base := t.TempDir()
if err := tt.adsc.Save(base); (err == nil && tt.err != nil) || (err != nil && tt.err == nil) {
t.Errorf("AdscSave() => %v expected err %v", err, tt.err)
}
if ldsTCP := readFile(base+"_lds_tcp.json", t); ldsTCP != tt.expectedJSON["_lds_tcp"] {
t.Errorf("AdscSave() => %s expected ldsTcp %s\n%v", ldsTCP, tt.expectedJSON["_lds_tcp"], cmp.Diff(ldsTCP, tt.expectedJSON["_lds_tcp"]))
}
if ldsHTTP := readFile(base+"_lds_http.json", t); ldsHTTP != tt.expectedJSON["_lds_http"] {
t.Errorf("AdscSave() => %s expected ldsHttp %s", ldsHTTP, tt.expectedJSON["_lds_http"])
}
if rds := readFile(base+"_rds.json", t); rds != tt.expectedJSON["_rds"] {
t.Errorf("AdscSave() => %s expected rds %s", rds, tt.expectedJSON["_rds"])
}
if ecds := readFile(base+"_ecds.json", t); ecds != tt.expectedJSON["_ecds"] {
t.Errorf("AdscSave() => %s expected ecds %s", ecds, tt.expectedJSON["_ecds"])
}
if cds := readFile(base+"_cds.json", t); cds != tt.expectedJSON["_cds"] {
t.Errorf("AdscSave() => %s expected cds %s", cds, tt.expectedJSON["_cds"])
}
if eds := readFile(base+"_eds.json", t); eds != tt.expectedJSON["_eds"] {
t.Errorf("AdscSave() => %s expected eds %s", eds, tt.expectedJSON["_eds"])
}
saveTeardown(base, t)
})
}
}
func saveTeardown(base string, t *testing.T) {
if err := os.Remove(base + "_lds_tcp.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
}
if err := os.Remove(base + "_lds_http.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
}
if err := os.Remove(base + "_cds.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
}
if err := os.Remove(base + "_rds.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
}
if err := os.Remove(base + "_ecds.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
}
if err := os.Remove(base + "_eds.json"); err != nil {
t.Errorf("Unable to cleanup: %v", err)
}
}
func readFile(dir string, t *testing.T) string {
dat, err := os.ReadFile(dir)
if err != nil {
t.Fatalf("file %s issue: %v", dat, err)
}
return string(dat)
}
func TestADSC_handleMCP(t *testing.T) {
rev := "test-rev"
adsc := &ADSC{
VersionInfo: map[string]string{},
Store: model.MakeIstioStore(memory.Make(collections.Pilot)),
cfg: &Config{Revision: rev},
}
patchLabel := func(lbls map[string]string, name, value string) map[string]string {
if lbls == nil {
lbls = map[string]string{}
}
lbls[name] = value
return lbls
}
tests := []struct {
desc string
resources []*any.Any
expectedResources [][]string
}{
{
desc: "create-resources",
resources: []*any.Any{
constructResource("foo1", "foo1.bar.com", "192.1.1.1", "1"),
constructResource("foo2", "foo2.bar.com", "192.1.1.2", "1"),
},
expectedResources: [][]string{
{"foo1", "foo1.bar.com", "192.1.1.1"},
{"foo2", "foo2.bar.com", "192.1.1.2"},
},
},
{
desc: "create-resources-rev-1",
resources: []*any.Any{
constructResource("foo1", "foo1.bar.com", "192.1.1.1", "1"),
constructResourceWithOptions("foo2", "foo2.bar.com", "192.1.1.2", "1", func(resource *mcp.Resource) {
resource.Metadata.Labels = patchLabel(resource.Metadata.Labels, label.IoIstioRev.Name, rev+"wrong") // to del
}),
constructResourceWithOptions("foo3", "foo3.bar.com", "192.1.1.3", "1", func(resource *mcp.Resource) {
resource.Metadata.Labels = patchLabel(resource.Metadata.Labels, label.IoIstioRev.Name, rev) // to add
}),
},
expectedResources: [][]string{
{"foo1", "foo1.bar.com", "192.1.1.1"},
{"foo3", "foo3.bar.com", "192.1.1.3"},
},
},
{
desc: "create-resources-rev-2",
resources: []*any.Any{
constructResource("foo1", "foo1.bar.com", "192.1.1.1", "1"),
constructResourceWithOptions("foo2", "foo2.bar.com", "192.1.1.2", "1", func(resource *mcp.Resource) {
resource.Metadata.Labels = patchLabel(resource.Metadata.Labels, label.IoIstioRev.Name, rev) // to add back
}),
constructResourceWithOptions("foo3", "foo3.bar.com", "192.1.1.3", "1", func(resource *mcp.Resource) {
resource.Metadata.Labels = patchLabel(resource.Metadata.Labels, label.IoIstioRev.Name, rev+"wrong") // to del
}),
},
expectedResources: [][]string{
{"foo1", "foo1.bar.com", "192.1.1.1"},
{"foo2", "foo2.bar.com", "192.1.1.2"},
},
},
{
desc: "update-and-create-resources",
resources: []*any.Any{
constructResource("foo1", "foo1.bar.com", "192.1.1.11", "2"),
constructResource("foo2", "foo2.bar.com", "192.1.1.22", "1"),
constructResource("foo3", "foo3.bar.com", "192.1.1.3", ""),
},
expectedResources: [][]string{
{"foo1", "foo1.bar.com", "192.1.1.11"},
{"foo2", "foo2.bar.com", "192.1.1.2"},
{"foo3", "foo3.bar.com", "192.1.1.3"},
},
},
{
desc: "update-delete-and-create-resources",
resources: []*any.Any{
constructResource("foo2", "foo2.bar.com", "192.1.1.222", "4"),
constructResource("foo4", "foo4.bar.com", "192.1.1.4", "1"),
},
expectedResources: [][]string{
{"foo2", "foo2.bar.com", "192.1.1.222"},
{"foo4", "foo4.bar.com", "192.1.1.4"},
},
},
{
desc: "update-and-delete-resources",
resources: []*any.Any{
constructResource("foo2", "foo2.bar.com", "192.2.2.22", "3"),
constructResource("foo3", "foo3.bar.com", "192.1.1.33", ""),
},
expectedResources: [][]string{
{"foo2", "foo2.bar.com", "192.2.2.22"},
{"foo3", "foo3.bar.com", "192.1.1.33"},
},
},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
gvk := []string{"networking.istio.io", "v1alpha3", "ServiceEntry"}
adsc.handleMCP(gvk, tt.resources)
configs, _ := adsc.Store.List(collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(), "")
if len(configs) != len(tt.expectedResources) {
t.Errorf("expected %v got %v", len(tt.expectedResources), len(configs))
}
configMap := make(map[string][]string)
for _, conf := range configs {
service, _ := conf.Spec.(*networking.ServiceEntry)
configMap[conf.Name] = []string{conf.Name, service.Hosts[0], service.Addresses[0]}
}
for _, expected := range tt.expectedResources {
got, ok := configMap[expected[0]]
if !ok {
t.Errorf("expected %v got none", expected)
} else {
for i, value := range expected {
if value != got[i] {
t.Errorf("expected %v got %v", value, got[i])
}
}
}
}
})
}
}
func constructResourceWithOptions(name string, host string, address, version string, options ...func(resource *mcp.Resource)) *any.Any {
service := &networking.ServiceEntry{
Hosts: []string{host},
Addresses: []string{address},
}
seAny, _ := any.New(service)
resource := &mcp.Resource{
Metadata: &mcp.Metadata{
Name: "default/" + name,
CreateTime: timestamppb.Now(),
Version: version,
},
Body: seAny,
}
for _, o := range options {
o(resource)
}
resAny, _ := any.New(resource)
return &any.Any{
TypeUrl: resAny.TypeUrl,
Value: resAny.Value,
}
}
func constructResource(name string, host string, address, version string) *any.Any {
return constructResourceWithOptions(name, host, address, version)
}