blob: 3cfb23161aa12d58a3e3bcfde9763f230eb1f615 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"context"
"sync"
"time"
)
import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
sds "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/test"
)
func NewAdsTest(t test.Failer, conn *grpc.ClientConn) *AdsTest {
return NewXdsTest(t, conn, func(conn *grpc.ClientConn) (DiscoveryClient, error) {
xds := discovery.NewAggregatedDiscoveryServiceClient(conn)
return xds.StreamAggregatedResources(context.Background())
})
}
func NewSdsTest(t test.Failer, conn *grpc.ClientConn) *AdsTest {
return NewXdsTest(t, conn, func(conn *grpc.ClientConn) (DiscoveryClient, error) {
xds := sds.NewSecretDiscoveryServiceClient(conn)
return xds.StreamSecrets(context.Background())
}).WithType(v3.SecretType)
}
func NewXdsTest(t test.Failer, conn *grpc.ClientConn, getClient func(conn *grpc.ClientConn) (DiscoveryClient, error)) *AdsTest {
ctx, cancel := context.WithCancel(context.Background())
cl, err := getClient(conn)
if err != nil {
t.Fatal(err)
}
resp := &AdsTest{
client: cl,
conn: conn,
context: ctx,
cancelContext: cancel,
ID: "sidecar~1.1.1.1~test.default~default.svc.cluster.local",
timeout: time.Second,
Type: v3.ClusterType,
responses: make(chan *discovery.DiscoveryResponse),
error: make(chan error),
}
t.Cleanup(resp.Cleanup)
go resp.adsReceiveChannel()
return resp
}
type AdsTest struct {
client DiscoveryClient
responses chan *discovery.DiscoveryResponse
error chan error
conn *grpc.ClientConn
metadata model.NodeMetadata
ID string
Type string
cancelOnce sync.Once
context context.Context
cancelContext context.CancelFunc
timeout time.Duration
}
func (a *AdsTest) Cleanup() {
// Place in once to avoid race when two callers attempt to cleanup
a.cancelOnce.Do(func() {
a.cancelContext()
_ = a.client.CloseSend()
if a.conn != nil {
_ = a.conn.Close()
}
})
}
func (a *AdsTest) adsReceiveChannel() {
go func() {
<-a.context.Done()
a.Cleanup()
}()
for {
resp, err := a.client.Recv()
if err != nil {
if isUnexpectedError(err) {
log.Warnf("ads received error: %v", err)
}
select {
case a.error <- err:
case <-a.context.Done():
}
return
}
select {
case a.responses <- resp:
case <-a.context.Done():
return
}
}
}
// DrainResponses reads all responses, but does nothing to them
func (a *AdsTest) DrainResponses() {
for {
select {
case <-a.context.Done():
return
case r := <-a.responses:
log.Infof("drained response %v", r.TypeUrl)
}
}
}
// ExpectResponse waits until a response is received and returns it
func (a *AdsTest) ExpectResponse(t test.Failer) *discovery.DiscoveryResponse {
t.Helper()
select {
case <-time.After(a.timeout):
t.Fatalf("did not get response in time")
case resp := <-a.responses:
if resp == nil || len(resp.Resources) == 0 {
t.Fatalf("got empty response")
}
return resp
case err := <-a.error:
t.Fatalf("got error: %v", err)
}
return nil
}
// ExpectError waits until an error is received and returns it
func (a *AdsTest) ExpectError(t test.Failer) error {
t.Helper()
select {
case <-time.After(a.timeout):
t.Fatalf("did not get error in time")
case err := <-a.error:
return err
}
return nil
}
// ExpectNoResponse waits a short period of time and ensures no response is received
func (a *AdsTest) ExpectNoResponse(t test.Failer) {
t.Helper()
select {
case <-time.After(time.Millisecond * 50):
return
case resp := <-a.responses:
t.Fatalf("got unexpected response: %v", resp)
}
}
func (a *AdsTest) fillInRequestDefaults(req *discovery.DiscoveryRequest) *discovery.DiscoveryRequest {
if req == nil {
req = &discovery.DiscoveryRequest{}
}
if req.TypeUrl == "" {
req.TypeUrl = a.Type
}
if req.Node == nil {
req.Node = &core.Node{
Id: a.ID,
Metadata: a.metadata.ToStruct(),
}
}
return req
}
func (a *AdsTest) Request(t test.Failer, req *discovery.DiscoveryRequest) {
t.Helper()
req = a.fillInRequestDefaults(req)
if err := a.client.Send(req); err != nil {
t.Fatal(err)
}
}
// RequestResponseAck does a full XDS exchange: Send a request, get a response, and ACK the response
func (a *AdsTest) RequestResponseAck(t test.Failer, req *discovery.DiscoveryRequest) *discovery.DiscoveryResponse {
t.Helper()
req = a.fillInRequestDefaults(req)
a.Request(t, req)
resp := a.ExpectResponse(t)
req.ResponseNonce = resp.Nonce
req.VersionInfo = resp.VersionInfo
a.Request(t, req)
return resp
}
// RequestResponseAck does a full XDS exchange with an error: Send a request, get a response, and NACK the response
func (a *AdsTest) RequestResponseNack(t test.Failer, req *discovery.DiscoveryRequest) *discovery.DiscoveryResponse {
t.Helper()
if req == nil {
req = &discovery.DiscoveryRequest{}
}
a.Request(t, req)
resp := a.ExpectResponse(t)
req.ResponseNonce = resp.Nonce
req.ErrorDetail = &status.Status{Message: "Test request NACK"}
a.Request(t, req)
return resp
}
func (a *AdsTest) WithID(id string) *AdsTest {
a.ID = id
return a
}
func (a *AdsTest) WithType(typeURL string) *AdsTest {
a.Type = typeURL
return a
}
func (a *AdsTest) WithMetadata(m model.NodeMetadata) *AdsTest {
a.metadata = m
return a
}
func (a *AdsTest) WithTimeout(t time.Duration) *AdsTest {
a.timeout = t
return a
}