blob: 42e55f676ecb47f7fb215561b4470037132cb1f8 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
)
import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
uatomic "go.uber.org/atomic"
"google.golang.org/grpc"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
)
func createProxies(n int) []*Connection {
proxies := make([]*Connection, 0, n)
for p := 0; p < n; p++ {
proxies = append(proxies, &Connection{
conID: fmt.Sprintf("proxy-%v", p),
pushChannel: make(chan *Event),
stream: &fakeStream{},
})
}
return proxies
}
func wgDoneOrTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
wg.Wait()
c <- struct{}{}
}()
select {
case <-c:
return true
case <-time.After(timeout):
return false
}
}
func TestSendPushesManyPushes(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
semaphore := make(chan struct{}, 2)
queue := NewPushQueue()
defer queue.ShutDown()
proxies := createProxies(5)
pushes := make(map[string]int)
pushesMu := &sync.Mutex{}
for _, proxy := range proxies {
proxy := proxy
// Start receive thread
go func() {
for {
select {
case p := <-proxy.pushChannel:
p.done()
pushesMu.Lock()
pushes[proxy.conID]++
pushesMu.Unlock()
case <-stopCh:
return
}
}
}()
}
go doSendPushes(stopCh, semaphore, queue)
for push := 0; push < 100; push++ {
for _, proxy := range proxies {
queue.Enqueue(proxy, &model.PushRequest{Push: &model.PushContext{}})
}
time.Sleep(time.Millisecond * 10)
}
for queue.Pending() > 0 {
time.Sleep(time.Millisecond)
}
pushesMu.Lock()
defer pushesMu.Unlock()
for proxy, numPushes := range pushes {
if numPushes == 0 {
t.Fatalf("Proxy %v had 0 pushes", proxy)
}
}
}
func TestSendPushesSinglePush(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
semaphore := make(chan struct{}, 2)
queue := NewPushQueue()
defer queue.ShutDown()
proxies := createProxies(5)
wg := &sync.WaitGroup{}
wg.Add(5)
pushes := make(map[string]int)
pushesMu := &sync.Mutex{}
for _, proxy := range proxies {
proxy := proxy
// Start receive thread
go func() {
for {
select {
case p := <-proxy.pushChannel:
p.done()
pushesMu.Lock()
pushes[proxy.conID]++
pushesMu.Unlock()
wg.Done()
case <-stopCh:
return
}
}
}()
}
go doSendPushes(stopCh, semaphore, queue)
for _, proxy := range proxies {
queue.Enqueue(proxy, &model.PushRequest{Push: &model.PushContext{}})
}
if !wgDoneOrTimeout(wg, time.Second) {
t.Fatalf("Expected 5 pushes but got %v", len(pushes))
}
expected := map[string]int{
"proxy-0": 1,
"proxy-1": 1,
"proxy-2": 1,
"proxy-3": 1,
"proxy-4": 1,
}
if !reflect.DeepEqual(expected, pushes) {
t.Fatalf("Expected pushes %+v, got %+v", expected, pushes)
}
}
type fakeStream struct {
grpc.ServerStream
}
func (h *fakeStream) Send(*discovery.DiscoveryResponse) error {
return nil
}
func (h *fakeStream) Recv() (*discovery.DiscoveryRequest, error) {
return nil, nil
}
func (h *fakeStream) Context() context.Context {
return context.Background()
}
func TestDebounce(t *testing.T) {
// This test tests the timeout and debouncing of config updates
// If it is flaking, DebounceAfter may need to be increased, or the code refactored to mock time.
// For now, this seems to work well
opts := debounceOptions{
debounceAfter: time.Millisecond * 50,
debounceMax: time.Millisecond * 100,
enableEDSDebounce: false,
}
tests := []struct {
name string
test func(updateCh chan *model.PushRequest, expect func(partial, full int32))
}{
{
name: "Should not debounce partial pushes",
test: func(updateCh chan *model.PushRequest, expect func(partial, full int32)) {
updateCh <- &model.PushRequest{Full: false}
expect(1, 0)
updateCh <- &model.PushRequest{Full: false}
expect(2, 0)
updateCh <- &model.PushRequest{Full: false}
expect(3, 0)
updateCh <- &model.PushRequest{Full: false}
expect(4, 0)
updateCh <- &model.PushRequest{Full: false}
expect(5, 0)
},
},
{
name: "Should debounce full pushes",
test: func(updateCh chan *model.PushRequest, expect func(partial, full int32)) {
updateCh <- &model.PushRequest{Full: true}
expect(0, 0)
},
},
{
name: "Should send full updates in batches",
test: func(updateCh chan *model.PushRequest, expect func(partial, full int32)) {
updateCh <- &model.PushRequest{Full: true}
updateCh <- &model.PushRequest{Full: true}
expect(0, 1)
},
},
{
name: "Should send full updates in batches, partial updates immediately",
test: func(updateCh chan *model.PushRequest, expect func(partial, full int32)) {
updateCh <- &model.PushRequest{Full: true}
updateCh <- &model.PushRequest{Full: true}
updateCh <- &model.PushRequest{Full: false}
updateCh <- &model.PushRequest{Full: false}
expect(2, 1)
updateCh <- &model.PushRequest{Full: false}
expect(3, 1)
},
},
{
name: "Should force a push after DebounceMax",
test: func(updateCh chan *model.PushRequest, expect func(partial, full int32)) {
// Send many requests within debounce window
updateCh <- &model.PushRequest{Full: true}
time.Sleep(opts.debounceAfter / 2)
updateCh <- &model.PushRequest{Full: true}
time.Sleep(opts.debounceAfter / 2)
updateCh <- &model.PushRequest{Full: true}
time.Sleep(opts.debounceAfter / 2)
updateCh <- &model.PushRequest{Full: true}
time.Sleep(opts.debounceAfter / 2)
expect(0, 1)
},
},
{
name: "Should push synchronously after debounce",
test: func(updateCh chan *model.PushRequest, expect func(partial, full int32)) {
updateCh <- &model.PushRequest{Full: true}
time.Sleep(opts.debounceAfter + 10*time.Millisecond)
updateCh <- &model.PushRequest{Full: true}
expect(0, 2)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stopCh := make(chan struct{})
updateCh := make(chan *model.PushRequest)
pushingCh := make(chan struct{}, 1)
errCh := make(chan error, 1)
var partialPushes int32
var fullPushes int32
wg := sync.WaitGroup{}
fakePush := func(req *model.PushRequest) {
if req.Full {
select {
case pushingCh <- struct{}{}:
default:
errCh <- fmt.Errorf("multiple pushes happen simultaneously")
return
}
atomic.AddInt32(&fullPushes, 1)
time.Sleep(opts.debounceMax * 2)
<-pushingCh
} else {
atomic.AddInt32(&partialPushes, 1)
}
}
updateSent := uatomic.NewInt64(0)
wg.Add(1)
go func() {
debounce(updateCh, stopCh, opts, fakePush, updateSent)
wg.Done()
}()
expect := func(expectedPartial, expectedFull int32) {
t.Helper()
err := retry.UntilSuccess(func() error {
select {
case err := <-errCh:
t.Error(err)
return err
default:
partial := atomic.LoadInt32(&partialPushes)
full := atomic.LoadInt32(&fullPushes)
if partial != expectedPartial || full != expectedFull {
return fmt.Errorf("got %v full and %v partial, expected %v full and %v partial", full, partial, expectedFull, expectedPartial)
}
return nil
}
}, retry.Timeout(opts.debounceAfter*8), retry.Delay(opts.debounceAfter/2))
if err != nil {
t.Error(err)
}
}
// Send updates
tt.test(updateCh, expect)
close(stopCh)
wg.Wait()
})
}
}
func TestShouldRespond(t *testing.T) {
tests := []struct {
name string
connection *Connection
request *discovery.DiscoveryRequest
response bool
}{
{
name: "initial request",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.ClusterType,
},
response: true,
},
{
name: "ack",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{
v3.ClusterType: {
VersionSent: "v1",
NonceSent: "nonce",
},
},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.ClusterType,
VersionInfo: "v1",
ResponseNonce: "nonce",
},
response: false,
},
{
name: "nack",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{
v3.ClusterType: {
VersionSent: "v1",
NonceSent: "nonce",
},
},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.ClusterType,
VersionInfo: "v1",
ResponseNonce: "stale nonce",
},
response: false,
},
{
name: "reconnect",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.ClusterType,
VersionInfo: "v1",
ResponseNonce: "reconnect nonce",
},
response: true,
},
{
name: "resources change",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{
v3.EndpointType: {
VersionSent: "v1",
NonceSent: "nonce",
ResourceNames: []string{"cluster1"},
},
},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.EndpointType,
VersionInfo: "v1",
ResponseNonce: "nonce",
ResourceNames: []string{"cluster1", "cluster2"},
},
response: true,
},
{
name: "ack with same resources",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{
v3.EndpointType: {
VersionSent: "v1",
NonceSent: "nonce",
ResourceNames: []string{"cluster2", "cluster1"},
},
},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.EndpointType,
VersionInfo: "v1",
ResponseNonce: "nonce",
ResourceNames: []string{"cluster1", "cluster2"},
},
response: false,
},
{
name: "unsubscribe EDS",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{
v3.EndpointType: {
VersionSent: "v1",
NonceSent: "nonce",
ResourceNames: []string{"cluster2", "cluster1"},
},
},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.EndpointType,
VersionInfo: "v1",
ResponseNonce: "nonce",
ResourceNames: []string{},
},
response: false,
},
{
name: "resources change",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{
v3.DubboServiceNameMappingType: {
VersionSent: "v1",
NonceSent: "nonce",
ResourceNames: []string{"dubbo.demo.hello1"},
},
},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.DubboServiceNameMappingType,
VersionInfo: "v1",
ResponseNonce: "nonce",
ResourceNames: []string{"dubbo.demo.hello1", "dubbo.demo.hello2"},
},
response: true,
},
{
name: "ack with same resources",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{
v3.DubboServiceNameMappingType: {
VersionSent: "v1",
NonceSent: "nonce",
ResourceNames: []string{"dubbo.demo.hello1", "dubbo.demo.hello2"},
},
},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.DubboServiceNameMappingType,
VersionInfo: "v1",
ResponseNonce: "nonce",
ResourceNames: []string{"dubbo.demo.hello1", "dubbo.demo.hello2"},
},
response: false,
},
{
name: "unsubscribe snp",
connection: &Connection{
proxy: &model.Proxy{
WatchedResources: map[string]*model.WatchedResource{
v3.DubboServiceNameMappingType: {
VersionSent: "v1",
NonceSent: "nonce",
ResourceNames: []string{"dubbo.demo.hello1", "dubbo.demo.hello2"},
},
},
},
},
request: &discovery.DiscoveryRequest{
TypeUrl: v3.DubboServiceNameMappingType,
VersionInfo: "v1",
ResponseNonce: "nonce",
ResourceNames: []string{},
},
response: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := NewFakeDiscoveryServer(t, FakeOptions{})
if response, _ := s.Discovery.shouldRespond(tt.connection, tt.request); response != tt.response {
t.Fatalf("Unexpected value for response, expected %v, got %v", tt.response, response)
}
if tt.name != "reconnect" && tt.response {
if tt.connection.proxy.WatchedResources[tt.request.TypeUrl].NonceAcked != tt.request.ResponseNonce {
t.Fatalf("Version & Nonce not updated properly")
}
}
})
}
}