blob: 9157923f89bc2827f6dd1f7a32c80f42563b07ea [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package istioagent
import (
"testing"
"time"
)
import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/grpc"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pilot/test/xdstest"
)
// TestXdsLeak is a regression test for https://github.com/istio/istio/issues/34097
func TestDeltaXdsLeak(t *testing.T) {
proxy := setupXdsProxyWithDownstreamOptions(t, []grpc.ServerOption{grpc.StreamInterceptor(xdstest.SlowServerInterceptor(time.Second, time.Second))})
f := xdstest.NewMockServer(t)
setDialOptions(proxy, f.Listener)
proxy.istiodDialOptions = append(proxy.istiodDialOptions, grpc.WithStreamInterceptor(xdstest.SlowClientInterceptor(0, time.Second*10)))
conn := setupDownstreamConnection(t, proxy)
downstream := deltaStream(t, conn)
sendDeltaDownstreamWithoutResponse(t, downstream)
for i := 0; i < 15; i++ {
// Send a bunch of responses from Istiod. These should not block, even though there are more sends than responseChan can hold
f.SendDeltaResponse(&discovery.DeltaDiscoveryResponse{TypeUrl: v3.ClusterType})
}
// Exit test, closing the connections. We should not have any goroutine leaks (checked by leak.CheckMain)
}
// sendDownstreamWithoutResponse sends a response without waiting for a response
func sendDeltaDownstreamWithoutResponse(t *testing.T, downstream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient) {
t.Helper()
err := downstream.Send(&discovery.DeltaDiscoveryRequest{
TypeUrl: v3.ClusterType,
Node: &core.Node{
Id: "sidecar~0.0.0.0~debug~cluster.local",
},
})
if err != nil {
t.Fatal(err)
}
}
// Validates basic xds proxy flow by proxying one CDS requests end to end.
func TestDeltaXdsProxyBasicFlow(t *testing.T) {
proxy := setupXdsProxy(t)
f := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
setDialOptions(proxy, f.BufListener)
conn := setupDownstreamConnection(t, proxy)
downstream := deltaStream(t, conn)
sendDeltaDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
}
func deltaStream(t *testing.T, conn *grpc.ClientConn) discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient {
t.Helper()
adsClient := discovery.NewAggregatedDiscoveryServiceClient(conn)
downstream, err := adsClient.DeltaAggregatedResources(ctx)
if err != nil {
t.Fatal(err)
}
return downstream
}
func sendDeltaDownstreamWithNode(t *testing.T, downstream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient, meta model.NodeMetadata) {
t.Helper()
node := &core.Node{
Id: "sidecar~1.1.1.1~debug~cluster.local",
Metadata: meta.ToStruct(),
}
err := downstream.Send(&discovery.DeltaDiscoveryRequest{TypeUrl: v3.ClusterType, Node: node})
if err != nil {
t.Fatal(err)
}
res, err := downstream.Recv()
if err != nil {
t.Fatal(err)
}
if res == nil || res.TypeUrl != v3.ClusterType {
t.Fatalf("Expected to get cluster response but got %v", res)
}
err = downstream.Send(&discovery.DeltaDiscoveryRequest{TypeUrl: v3.ListenerType, Node: node})
if err != nil {
t.Fatal(err)
}
res, err = downstream.Recv()
if err != nil {
t.Fatal(err)
}
if res == nil || res.TypeUrl != v3.ListenerType {
t.Fatalf("Expected to get listener response but got %v", res)
}
}