blob: c642b8eab8604ba8c70c839731446e4beeee99c4 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package istioagent
import (
"context"
"errors"
"fmt"
"net"
"os"
"path"
"path/filepath"
"testing"
"time"
)
import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
wasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3"
wasmv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
google_rpc "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/proto"
extensions "istio.io/api/extensions/v1alpha1"
networking "istio.io/api/networking/v1alpha3"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model/status"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pilot/test/xdstest"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/envoy"
"github.com/apache/dubbo-go-pixiu/pkg/security"
"github.com/apache/dubbo-go-pixiu/pkg/test"
"github.com/apache/dubbo-go-pixiu/pkg/test/env"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
)
// TestXdsLeak is a regression test for https://github.com/istio/istio/issues/34097
func TestXdsLeak(t *testing.T) {
proxy := setupXdsProxyWithDownstreamOptions(t, []grpc.ServerOption{grpc.StreamInterceptor(xdstest.SlowServerInterceptor(time.Second, time.Second))})
f := xdstest.NewMockServer(t)
setDialOptions(proxy, f.Listener)
proxy.istiodDialOptions = append(proxy.istiodDialOptions, grpc.WithStreamInterceptor(xdstest.SlowClientInterceptor(0, time.Second*10)))
conn := setupDownstreamConnection(t, proxy)
downstream := stream(t, conn)
sendDownstreamWithoutResponse(t, downstream)
for i := 0; i < 15; i++ {
// Send a bunch of responses from Istiod. These should not block, even though there are more sends than responseChan can hold
f.SendResponse(&discovery.DiscoveryResponse{TypeUrl: v3.ClusterType})
}
// Exit test, closing the connections. We should not have any goroutine leaks (checked by leak.CheckMain)
}
// sendDownstreamWithoutResponse sends a response without waiting for a response
func sendDownstreamWithoutResponse(t *testing.T, downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient) {
t.Helper()
err := downstream.Send(&discovery.DiscoveryRequest{
TypeUrl: v3.ClusterType,
Node: &core.Node{
Id: "sidecar~0.0.0.0~debug~cluster.local",
},
})
if err != nil {
t.Fatal(err)
}
}
// Validates basic xds proxy flow by proxying one CDS requests end to end.
func TestXdsProxyBasicFlow(t *testing.T) {
proxy := setupXdsProxy(t)
f := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
setDialOptions(proxy, f.BufListener)
conn := setupDownstreamConnection(t, proxy)
downstream := stream(t, conn)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
}
// Validates the proxy health checking updates
func TestXdsProxyHealthCheck(t *testing.T) {
healthy := &discovery.DiscoveryRequest{TypeUrl: v3.HealthInfoType}
unhealthy := &discovery.DiscoveryRequest{
TypeUrl: v3.HealthInfoType,
ErrorDetail: &google_rpc.Status{
Code: 500,
Message: "unhealthy",
},
}
node := model.NodeMetadata{
AutoRegisterGroup: "group",
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
}
proxy := setupXdsProxy(t)
f := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
if _, err := f.Store().Create(config.Config{
Meta: config.Meta{
Name: "group",
Namespace: "default",
GroupVersionKind: gvk.WorkloadGroup,
},
Spec: &networking.WorkloadGroup{
Template: &networking.WorkloadEntry{},
},
}); err != nil {
t.Fatal(err)
}
setDialOptions(proxy, f.BufListener)
conn := setupDownstreamConnection(t, proxy)
downstream := stream(t, conn)
// Setup test helpers
waitDisconnect := func() {
retry.UntilSuccessOrFail(t, func() error {
proxy.connectedMutex.Lock()
defer proxy.connectedMutex.Unlock()
if proxy.connected != nil {
return fmt.Errorf("still connected")
}
return nil
}, retry.Timeout(time.Second), retry.Delay(time.Millisecond))
}
expectCondition := func(expected string) {
t.Helper()
retry.UntilSuccessOrFail(t, func() error {
cfg := f.Store().Get(gvk.WorkloadEntry, "group-1.1.1.1", "default")
if cfg == nil {
return fmt.Errorf("config not found")
}
con := status.GetConditionFromSpec(*cfg, status.ConditionHealthy)
if con == nil {
if expected == "" {
return nil
}
return fmt.Errorf("found no conditions, expected %v", expected)
}
if con.Status != expected {
return fmt.Errorf("expected status %q got %q", expected, con.Status)
}
return nil
}, retry.Timeout(time.Second*2))
}
// send cds before healthcheck, to make wle registered
coreNode := &core.Node{
Id: "sidecar~1.1.1.1~debug~cluster.local",
Metadata: node.ToStruct(),
}
err := downstream.Send(&discovery.DiscoveryRequest{TypeUrl: v3.ClusterType, Node: coreNode})
if err != nil {
t.Fatal(err)
}
_, err = downstream.Recv()
if err != nil {
t.Fatal(err)
}
// healthcheck before lds will be not sent
proxy.PersistRequest(healthy)
expectCondition("")
// simulate envoy send xds requests
sendDownstreamWithNode(t, downstream, node)
// after lds sent, the caching healthcheck will be resent
expectCondition(status.StatusTrue)
// Flip status back and forth, ensure we update
proxy.PersistRequest(healthy)
expectCondition(status.StatusTrue)
proxy.PersistRequest(unhealthy)
expectCondition(status.StatusFalse)
proxy.PersistRequest(healthy)
expectCondition(status.StatusTrue)
// Completely disconnect
conn.Close()
downstream.CloseSend()
waitDisconnect()
conn = setupDownstreamConnection(t, proxy)
downstream = stream(t, conn)
sendDownstreamWithNode(t, downstream, node)
// Old status should remain
expectCondition(status.StatusTrue)
// And still update when we send new requests
proxy.PersistRequest(unhealthy)
expectCondition(status.StatusFalse)
// Send a new update while we are disconnected
conn.Close()
downstream.CloseSend()
waitDisconnect()
proxy.PersistRequest(healthy)
conn = setupDownstreamConnection(t, proxy)
downstream = stream(t, conn)
sendDownstreamWithNode(t, downstream, node)
// When we reconnect, our status update should still persist
expectCondition(status.StatusTrue)
// Confirm more updates are honored
proxy.PersistRequest(healthy)
expectCondition(status.StatusTrue)
proxy.PersistRequest(unhealthy)
expectCondition(status.StatusFalse)
// Disconnect and remove workload entry. This could happen if there is an outage and istiod cleans up
// the workload entry.
conn.Close()
downstream.CloseSend()
waitDisconnect()
f.Store().Delete(gvk.WorkloadEntry, "group-1.1.1.1", "default", nil)
proxy.PersistRequest(healthy)
conn = setupDownstreamConnection(t, proxy)
downstream = stream(t, conn)
sendDownstreamWithNode(t, downstream, node)
// When we reconnect, our status update should be re-applied
expectCondition(status.StatusTrue)
// Confirm more updates are honored
proxy.PersistRequest(unhealthy)
expectCondition(status.StatusFalse)
proxy.PersistRequest(healthy)
expectCondition(status.StatusTrue)
}
func setupXdsProxy(t *testing.T) *XdsProxy {
return setupXdsProxyWithDownstreamOptions(t, nil)
}
func setupXdsProxyWithDownstreamOptions(t *testing.T, opts []grpc.ServerOption) *XdsProxy {
secOpts := &security.Options{
FileMountedCerts: true,
}
proxyConfig := mesh.DefaultProxyConfig()
proxyConfig.DiscoveryAddress = "buffcon"
// While the actual test runs on plain text, these are setup to build the default dial options
// with out these it looks for default cert location and fails.
proxyConfig.ProxyMetadata = map[string]string{
MetadataClientCertChain: path.Join(env.IstioSrc, "tests/testdata/certs/pilot/cert-chain.pem"),
MetadataClientCertKey: path.Join(env.IstioSrc, "tests/testdata/certs/pilot/key.pem"),
MetadataClientRootCert: path.Join(env.IstioSrc, "tests/testdata/certs/pilot/root-cert.pem"),
}
dir := t.TempDir()
ia := NewAgent(proxyConfig, &AgentOptions{
XdsUdsPath: filepath.Join(dir, "XDS"),
DownstreamGrpcOptions: opts,
}, secOpts, envoy.ProxyConfig{TestOnly: true})
t.Cleanup(func() {
ia.Close()
})
proxy, err := initXdsProxy(ia)
if err != nil {
t.Fatalf("Failed to initialize xds proxy %v", err)
}
ia.xdsProxy = proxy
return proxy
}
func setDialOptions(p *XdsProxy, l *bufconn.Listener) {
// Override istiodDialOptions so that the test can connect with plain text and with buffcon listener.
p.istiodDialOptions = []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return l.Dial()
}),
}
}
var ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", "Kubernetes")
// Validates basic xds proxy flow by proxying one CDS requests end to end.
func TestXdsProxyReconnects(t *testing.T) {
waitDisconnect := func(proxy *XdsProxy) {
retry.UntilSuccessOrFail(t, func() error {
proxy.connectedMutex.Lock()
defer proxy.connectedMutex.Unlock()
if proxy.connected != nil {
return fmt.Errorf("still connected")
}
return nil
}, retry.Timeout(time.Second), retry.Delay(time.Millisecond))
}
t.Run("Envoy close and open stream", func(t *testing.T) {
proxy := setupXdsProxy(t)
f := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
setDialOptions(proxy, f.BufListener)
conn := setupDownstreamConnection(t, proxy)
downstream := stream(t, conn)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
downstream.CloseSend()
waitDisconnect(proxy)
downstream = stream(t, conn)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
})
t.Run("Envoy opens multiple stream", func(t *testing.T) {
proxy := setupXdsProxy(t)
f := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
setDialOptions(proxy, f.BufListener)
conn := setupDownstreamConnection(t, proxy)
downstream := stream(t, conn)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
downstream = stream(t, conn)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
})
t.Run("Envoy closes connection", func(t *testing.T) {
proxy := setupXdsProxy(t)
f := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
setDialOptions(proxy, f.BufListener)
conn := setupDownstreamConnection(t, proxy)
downstream := stream(t, conn)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
conn.Close()
c := setupDownstreamConnection(t, proxy)
downstream = stream(t, c)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
})
t.Run("Envoy sends concurrent requests", func(t *testing.T) {
// Envoy doesn't really do this, in reality it should only have a single connection. However,
// this ensures we are robust against cases where envoy rapidly disconnects and reconnects
proxy := setupXdsProxy(t)
f := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
setDialOptions(proxy, f.BufListener)
conn := setupDownstreamConnection(t, proxy)
downstream := stream(t, conn)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
c := setupDownstreamConnection(t, proxy)
downstream = stream(t, c)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
conn.Close()
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
})
t.Run("Istiod closes connection", func(t *testing.T) {
proxy := setupXdsProxy(t)
f := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
// Here we set up a real listener (instead of in memory) since we need to close and re-open
// a new listener on the same port, which we cannot do with the in memory listener.
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
proxy.istiodAddress = listener.Addr().String()
proxy.istiodDialOptions = []grpc.DialOption{grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())}
// Setup gRPC server
grpcServer := grpc.NewServer()
t.Cleanup(grpcServer.Stop)
f.Discovery.Register(grpcServer)
go grpcServer.Serve(listener)
// Send initial request
conn := setupDownstreamConnection(t, proxy)
downstream := stream(t, conn)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
// Stop server, setup a new one. This simulates an Istiod pod being torn down
grpcServer.Stop()
listener, err = net.Listen("tcp", listener.Addr().String())
if err != nil {
t.Fatal(err)
}
waitDisconnect(proxy)
grpcServer = grpc.NewServer()
t.Cleanup(grpcServer.Stop)
f.Discovery.Register(grpcServer)
go grpcServer.Serve(listener)
// Send downstream again
downstream = stream(t, conn)
sendDownstreamWithNode(t, downstream, model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
})
})
}
type fakeAckCache struct{}
func (f *fakeAckCache) Get(string, string, string, string, time.Duration, []byte, extensions.PullPolicy) (string, error) {
return "test", nil
}
func (f *fakeAckCache) Cleanup() {}
type fakeNackCache struct{}
func (f *fakeNackCache) Get(string, string, string, string, time.Duration, []byte, extensions.PullPolicy) (string, error) {
return "", errors.New("errror")
}
func (f *fakeNackCache) Cleanup() {}
func TestECDSWasmConversion(t *testing.T) {
node := model.NodeMetadata{
Namespace: "default",
InstanceIPs: []string{"1.1.1.1"},
ClusterID: "Kubernetes",
}
proxy := setupXdsProxy(t)
// Reset wasm cache to a fake ACK cache.
proxy.wasmCache.Cleanup()
proxy.wasmCache = &fakeAckCache{}
// Initialize discovery server with an ECDS resource.
ef, err := os.ReadFile(path.Join(env.IstioSrc, "pilot/pkg/xds/testdata/ecds.yaml"))
if err != nil {
t.Fatal(err)
}
f := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{
ConfigString: string(ef),
})
setDialOptions(proxy, f.BufListener)
conn := setupDownstreamConnection(t, proxy)
downstream := stream(t, conn)
// Send first request, wasm module async fetch should work fine and
// downstream should receive a local file based wasm extension config.
err = downstream.Send(&discovery.DiscoveryRequest{
TypeUrl: v3.ExtensionConfigurationType,
ResourceNames: []string{"extension-config"},
Node: &core.Node{
Id: "sidecar~1.1.1.1~debug~cluster.local",
Metadata: node.ToStruct(),
},
})
if err != nil {
t.Fatal(err)
}
// Verify that the request received has been rewritten to local file.
gotResp, err := downstream.Recv()
if err != nil {
t.Fatal(err)
}
if len(gotResp.Resources) != 1 {
t.Errorf("xds proxy ecds wasm conversion number of received resource got %v want 1", len(gotResp.Resources))
}
gotEcdsConfig := &core.TypedExtensionConfig{}
if err := gotResp.Resources[0].UnmarshalTo(gotEcdsConfig); err != nil {
t.Fatalf("wasm config conversion output %v failed to unmarshal", gotResp.Resources[0])
}
wasm := &wasm.Wasm{
Config: &wasmv3.PluginConfig{
Vm: &wasmv3.PluginConfig_VmConfig{
VmConfig: &wasmv3.VmConfig{
Code: &core.AsyncDataSource{Specifier: &core.AsyncDataSource_Local{
Local: &core.DataSource{
Specifier: &core.DataSource_Filename{
Filename: "test",
},
},
}},
},
},
},
}
wantEcdsConfig := &core.TypedExtensionConfig{
Name: "extension-config",
TypedConfig: util.MessageToAny(wasm),
}
if !proto.Equal(gotEcdsConfig, wantEcdsConfig) {
t.Errorf("xds proxy wasm config conversion got %v want %v", gotEcdsConfig, wantEcdsConfig)
}
v1 := proxy.ecdsLastAckVersion
n1 := proxy.ecdsLastNonce
// reset wasm cache to a NACK cache, and recreate xds server as well to simulate a version bump
proxy.wasmCache = &fakeNackCache{}
f = xds.NewFakeDiscoveryServer(t, xds.FakeOptions{
ConfigString: string(ef),
})
setDialOptions(proxy, f.BufListener)
conn = setupDownstreamConnection(t, proxy)
downstream = stream(t, conn)
// send request again, this time a nack should be returned from the xds proxy due to NACK wasm cache.
err = downstream.Send(&discovery.DiscoveryRequest{
VersionInfo: gotResp.VersionInfo,
TypeUrl: v3.ExtensionConfigurationType,
ResourceNames: []string{"extension-config"},
Node: &core.Node{
Id: "sidecar~1.1.1.1~debug~cluster.local",
Metadata: node.ToStruct(),
},
})
if err != nil {
t.Fatal(err)
}
// Wait until nonce was updated, which represents an ACK/NACK has been received.
retry.UntilSuccessOrFail(t, func() error {
if proxy.ecdsLastNonce.Load() == n1.Load() {
return errors.New("last process nonce has not been updated. no ecds ack/nack is received yet")
}
return nil
}, retry.Timeout(time.Second), retry.Delay(time.Millisecond))
// Verify that the last ack version remains the same, which represents the latest DiscoveryRequest is a NACK.
v2 := proxy.ecdsLastAckVersion
if v1.Load() == v2.Load() {
t.Errorf("last ack ecds request was updated. expect it to remain the same which represents a nack for ecds update")
}
}
func stream(t *testing.T, conn *grpc.ClientConn) discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient {
t.Helper()
adsClient := discovery.NewAggregatedDiscoveryServiceClient(conn)
downstream, err := adsClient.StreamAggregatedResources(ctx)
if err != nil {
t.Fatal(err)
}
return downstream
}
func sendDownstreamWithNode(t *testing.T, downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient, meta model.NodeMetadata) {
t.Helper()
node := &core.Node{
Id: "sidecar~1.1.1.1~debug~cluster.local",
Metadata: meta.ToStruct(),
}
err := downstream.Send(&discovery.DiscoveryRequest{TypeUrl: v3.ClusterType, Node: node})
if err != nil {
t.Fatal(err)
}
res, err := downstream.Recv()
if err != nil {
t.Fatal(err)
}
if res == nil || res.TypeUrl != v3.ClusterType {
t.Fatalf("Expected to get cluster response but got %v", res)
}
err = downstream.Send(&discovery.DiscoveryRequest{TypeUrl: v3.ListenerType, Node: node})
if err != nil {
t.Fatal(err)
}
res, err = downstream.Recv()
if err != nil {
t.Fatal(err)
}
if res == nil || res.TypeUrl != v3.ListenerType {
t.Fatalf("Expected to get listener response but got %v", res)
}
}
func setupDownstreamConnectionUDS(t test.Failer, path string) *grpc.ClientConn {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "unix", path)
}))
conn, err := grpc.Dial(path, opts...)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
conn.Close()
})
return conn
}
func setupDownstreamConnection(t *testing.T, proxy *XdsProxy) *grpc.ClientConn {
return setupDownstreamConnectionUDS(t, proxy.xdsUdsPath)
}