blob: bd9051e48609128c4f83d2bf73e5832d2a2b7f1b [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcgen_test
import (
"context"
"encoding/json"
"fmt"
"net"
"net/url"
"path"
"strconv"
"testing"
"time"
)
import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
xdsgrpc "google.golang.org/grpc/xds"
networking "istio.io/api/networking/v1alpha3"
security "istio.io/api/security/v1beta1"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/protocol"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
"github.com/apache/dubbo-go-pixiu/pkg/istio-agent/grpcxds"
"github.com/apache/dubbo-go-pixiu/pkg/test"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo/common"
echoproto "github.com/apache/dubbo-go-pixiu/pkg/test/echo/proto"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo/server/endpoint"
"github.com/apache/dubbo-go-pixiu/pkg/test/env"
)
// Address of the Istiod gRPC service, used in tests.
var istiodSvcHost = "istiod.dubbo-system.svc.cluster.local"
// Local integration tests for proxyless gRPC.
// The tests will start an in-process Istiod, using the memory store, and use
// proxyless grpc servers and clients to validate the config generation.
// GRPC project has more extensive tests for each language, we mainly verify that Istiod
// generates the expected XDS, and gRPC tests verify that the XDS is correctly interpreted.
//
// To debug, set GRPC_GO_LOG_SEVERITY_LEVEL=info;GRPC_GO_LOG_VERBOSITY_LEVEL=99 for
// verbose logs from gRPC side.
// GRPCBootstrap creates the bootstrap bytes dynamically.
// This can be used with NewXDSResolverWithConfigForTesting, and used when creating clients.
//
// See pkg/istio-agent/testdata/grpc-bootstrap.json for a sample bootstrap as expected by Istio agent.
func GRPCBootstrap(app, namespace, ip string, xdsPort int) []byte {
if ip == "" {
ip = "127.0.0.1"
}
if namespace == "" {
namespace = "default"
}
if app == "" {
app = "app"
}
nodeID := "sidecar~" + ip + "~" + app + "." + namespace + "~" + namespace + ".svc.cluster.local"
bootstrap, err := grpcxds.GenerateBootstrap(grpcxds.GenerateBootstrapOptions{
Node: &model.Node{
ID: nodeID,
Metadata: &model.BootstrapNodeMetadata{
NodeMetadata: model.NodeMetadata{
Namespace: namespace,
Generator: "grpc",
ClusterID: "Kubernetes",
},
},
},
DiscoveryAddress: fmt.Sprintf("127.0.0.1:%d", xdsPort),
CertDir: path.Join(env.IstioSrc, "tests/testdata/certs/default"),
})
if err != nil {
return []byte{}
}
bootstrapBytes, err := json.Marshal(bootstrap)
if err != nil {
return []byte{}
}
return bootstrapBytes
}
// resolverForTest creates a resolver for xds:// names using dynamic bootstrap.
func resolverForTest(t test.Failer, xdsPort int, ns string) resolver.Builder {
xdsresolver, err := xdsgrpc.NewXDSResolverWithConfigForTesting(
GRPCBootstrap("foo", ns, "10.0.0.1", xdsPort))
if err != nil {
t.Fatal(err)
}
return xdsresolver
}
func init() {
// Setup gRPC logging. Do it once in init to avoid races
o := log.DefaultOptions()
o.LogGrpc = true
log.Configure(o)
}
func TestGRPC(t *testing.T) {
// Init Istiod in-process server.
ds := xds.NewXDS(make(chan struct{}))
sd := ds.DiscoveryServer.MemRegistry
sd.ClusterID = "Kubernetes"
lis, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("net.Listen failed: %v", err)
}
lis.Addr()
_, ports, _ := net.SplitHostPort(lis.Addr().String())
port, _ := strconv.Atoi(ports)
// Echo service
// initRBACTests(sd, store, "echo-rbac-plain", 14058, false)
initRBACTests(sd, ds.MemoryConfigStore, "echo-rbac-mtls", port, true)
xdsAddr, err := ds.StartGRPC("127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ds.GRPCListener.Close()
_, xdsPorts, _ := net.SplitHostPort(xdsAddr)
xdsPort, _ := strconv.Atoi(xdsPorts)
addIstiod(sd, xdsPort)
env := ds.DiscoveryServer.Env
env.Init()
if err := env.PushContext.InitContext(env, env.PushContext, nil); err != nil {
t.Fatal(err)
}
ds.DiscoveryServer.UpdateServiceShards(env.PushContext)
// Client bootstrap - will show as "foo.clientns"
xdsresolver := resolverForTest(t, xdsPort, "clientns")
// Test the xdsresolver - query LDS and RDS for a specific service, wait for the update.
// Should be very fast (~20ms) and validate bootstrap and basic XDS connection.
// Unfortunately we have no way to look at the response except using the logs from XDS.
// This does not attempt to resolve CDS or EDS.
t.Run("gRPC-resolve", func(t *testing.T) {
rb := xdsresolver
stateCh := &Channel{ch: make(chan interface{}, 1)}
errorCh := &Channel{ch: make(chan interface{}, 1)}
_, err := rb.Build(resolver.Target{URL: url.URL{
Scheme: "xds",
Path: "/" + net.JoinHostPort(istiodSvcHost, xdsPorts),
}},
&testClientConn{stateCh: stateCh, errorCh: errorCh}, resolver.BuildOptions{})
if err != nil {
t.Fatal("Failed to resolve XDS ", err)
}
tm := time.After(10 * time.Second)
select {
case s := <-stateCh.ch:
t.Log("Got state ", s)
case e := <-errorCh.ch:
t.Error("Error in resolve", e)
case <-tm:
t.Error("Didn't resolve in time")
}
})
t.Run("gRPC-svc", func(t *testing.T) {
t.Run("gRPC-svc-tls", func(t *testing.T) {
// Replaces: insecure.NewCredentials
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
if err != nil {
t.Fatal(err)
}
grpcOptions := []grpc.ServerOption{
grpc.Creds(creds),
}
bootstrapB := GRPCBootstrap("echo-rbac-mtls", "test", "127.0.1.1", xdsPort)
grpcOptions = append(grpcOptions, xdsgrpc.BootstrapContentsForTesting(bootstrapB))
// Replaces: grpc NewServer
grpcServer := xdsgrpc.NewGRPCServer(grpcOptions...)
testRBAC(t, grpcServer, xdsresolver, "echo-rbac-mtls", port, lis)
})
})
t.Run("gRPC-dial", func(t *testing.T) {
for _, host := range []string{
"istiod.dubbo-system.svc.cluster.local",
//"istiod.dubbo-system.svc",
//"istiod.dubbo-system",
//"istiod",
} {
t.Run(host, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
conn, err := grpc.DialContext(ctx, "xds:///"+host+":"+xdsPorts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
grpc.WithResolvers(xdsresolver))
if err != nil {
t.Fatal("XDS gRPC", err)
}
defer conn.Close()
s, err := discovery.NewAggregatedDiscoveryServiceClient(conn).StreamAggregatedResources(ctx)
if err != nil {
t.Fatal(err)
}
_ = s.Send(&discovery.DiscoveryRequest{})
})
}
})
}
func addIstiod(sd *memory.ServiceDiscovery, xdsPort int) {
sd.AddService(&model.Service{
Attributes: model.ServiceAttributes{
Name: "istiod",
Namespace: "dubbo-system",
},
Hostname: host.Name(istiodSvcHost),
DefaultAddress: "127.0.1.12",
Ports: model.PortList{
{
Name: "grpc-main",
Port: xdsPort,
Protocol: protocol.GRPC, // SetEndpoints hardcodes this
},
},
})
sd.SetEndpoints(istiodSvcHost, "dubbo-system", []*model.IstioEndpoint{
{
Address: "127.0.0.1",
EndpointPort: uint32(xdsPort),
ServicePortName: "grpc-main",
},
})
}
func initRBACTests(sd *memory.ServiceDiscovery, store model.ConfigStore, svcname string, port int, mtls bool) {
ns := "test"
hn := svcname + "." + ns + ".svc.cluster.local"
// The 'memory' store GetProxyServiceInstances uses the IP address of the node and endpoints to
// identify the service. In k8s store, labels are matched instead.
// For server configs to work, the server XDS bootstrap must match the IP.
sd.AddService(&model.Service{
// Required: namespace (otherwise DR matching fails)
Attributes: model.ServiceAttributes{
Name: svcname,
Namespace: ns,
},
Hostname: host.Name(hn),
DefaultAddress: "127.0.5.1",
Ports: model.PortList{
{
Name: "grpc-main",
Port: port,
Protocol: protocol.GRPC,
},
},
})
// The address will be matched against the INSTANCE_IPS and id in the node id. If they match, the service is returned.
sd.SetEndpoints(hn, ns, []*model.IstioEndpoint{
{
Address: "127.0.1.1",
EndpointPort: uint32(port),
ServicePortName: "grpc-main",
},
})
store.Create(config.Config{
Meta: config.Meta{
GroupVersionKind: collections.IstioSecurityV1Beta1Authorizationpolicies.Resource().GroupVersionKind(),
Name: svcname,
Namespace: ns,
},
Spec: &security.AuthorizationPolicy{
Rules: []*security.Rule{
{
When: []*security.Condition{
{
Key: "request.headers[echo]",
Values: []string{
"block",
},
},
},
},
},
Action: security.AuthorizationPolicy_DENY,
},
})
store.Create(config.Config{
Meta: config.Meta{
GroupVersionKind: collections.IstioSecurityV1Beta1Authorizationpolicies.Resource().GroupVersionKind(),
Name: svcname + "-allow",
Namespace: ns,
},
Spec: &security.AuthorizationPolicy{
Rules: []*security.Rule{
{
When: []*security.Condition{
{
Key: "request.headers[echo]",
Values: []string{
"allow",
},
},
},
},
},
Action: security.AuthorizationPolicy_ALLOW,
},
})
if mtls {
// Client side.
_, _ = store.Create(config.Config{
Meta: config.Meta{
GroupVersionKind: collections.IstioNetworkingV1Alpha3Destinationrules.Resource().GroupVersionKind(),
Name: svcname,
Namespace: "test",
},
Spec: &networking.DestinationRule{
Host: svcname + ".test.svc.cluster.local",
TrafficPolicy: &networking.TrafficPolicy{Tls: &networking.ClientTLSSettings{
Mode: networking.ClientTLSSettings_ISTIO_MUTUAL,
}},
},
})
// Server side.
_, _ = store.Create(config.Config{
Meta: config.Meta{
GroupVersionKind: collections.IstioSecurityV1Beta1Peerauthentications.Resource().GroupVersionKind(),
Name: svcname,
Namespace: "test",
},
Spec: &security.PeerAuthentication{
Mtls: &security.PeerAuthentication_MutualTLS{Mode: security.PeerAuthentication_MutualTLS_STRICT},
},
})
_, _ = store.Create(config.Config{
Meta: config.Meta{
GroupVersionKind: collections.IstioSecurityV1Beta1Authorizationpolicies.Resource().GroupVersionKind(),
Name: svcname,
Namespace: "test",
},
Spec: &security.AuthorizationPolicy{
Rules: []*security.Rule{
{
From: []*security.Rule_From{
{
Source: &security.Source{
Principals: []string{"evie"},
},
},
},
},
},
Action: security.AuthorizationPolicy_DENY,
},
})
}
}
func testRBAC(t *testing.T, grpcServer *xdsgrpc.GRPCServer, xdsresolver resolver.Builder, svcname string, port int, lis net.Listener) {
echos := &endpoint.EchoGrpcHandler{Config: endpoint.Config{Port: &common.Port{Port: port}}}
echoproto.RegisterEchoTestServiceServer(grpcServer, echos)
go func() {
err := grpcServer.Serve(lis)
if err != nil {
log.Errora(err)
}
}()
time.Sleep(3 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
creds, _ := xdscreds.NewClientCredentials(xdscreds.ClientOptions{
FallbackCreds: insecure.NewCredentials(),
})
conn, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s.test.svc.cluster.local:%d", svcname, port),
grpc.WithTransportCredentials(creds),
grpc.WithBlock(),
grpc.WithResolvers(xdsresolver))
if err != nil {
t.Fatal("XDS gRPC", err)
}
defer conn.Close()
echoc := echoproto.NewEchoTestServiceClient(conn)
md := metadata.New(map[string]string{"echo": "block"})
outctx := metadata.NewOutgoingContext(context.Background(), md)
_, err = echoc.Echo(outctx, &echoproto.EchoRequest{})
if err == nil {
t.Fatal("RBAC rule not enforced")
}
if status.Code(err) != codes.PermissionDenied {
t.Fatal("Unexpected error", err)
}
t.Log(err)
}
type Channel struct {
ch chan interface{}
}
// Send sends value on the underlying channel.
func (c *Channel) Send(value interface{}) {
c.ch <- value
}
// From xds_resolver_test
// testClientConn is a fake implemetation of resolver.ClientConn. All is does
// is to store the state received from the resolver locally and signal that
// event through a channel.
type testClientConn struct {
resolver.ClientConn
stateCh *Channel
errorCh *Channel
}
func (t *testClientConn) UpdateState(s resolver.State) error {
t.stateCh.Send(s)
return nil
}
func (t *testClientConn) ReportError(err error) {
t.errorCh.Send(err)
}
func (t *testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult {
// Will be called with something like:
// {"loadBalancingConfig":[
// {"xds_cluster_manager_experimental":{
// "children":{
// "cluster:outbound|14057||istiod.dubbo-system.svc.cluster.local":{
// "childPolicy":[
// {"cds_experimental":
// {"cluster":"outbound|14057||istiod.dubbo-system.svc.cluster.local"}}]}}}}]}
return &serviceconfig.ParseResult{}
}