package grpcgen_test
import (
discovery ""
xdscreds ""
xdsgrpc ""
networking ""
security ""
echoproto ""
// Address of the Istiod gRPC service, used in tests.
var istiodSvcHost = "istiod.dubbo-system.svc.cluster.local"
// Local integration tests for proxyless gRPC.
// The tests will start an in-process Istiod, using the memory store, and use
// proxyless grpc servers and clients to validate the config generation.
// GRPC project has more extensive tests for each language, we mainly verify that Istiod
// generates the expected XDS, and gRPC tests verify that the XDS is correctly interpreted.
// verbose logs from gRPC side.
// GRPCBootstrap creates the bootstrap bytes dynamically.
// This can be used with NewXDSResolverWithConfigForTesting, and used when creating clients.
// See pkg/istio-agent/testdata/grpc-bootstrap.json for a sample bootstrap as expected by Istio agent.
func GRPCBootstrap(app, namespace, ip string, xdsPort int) []byte {
if ip == "" {
ip = ""
if namespace == "" {
namespace = "default"
if app == "" {
app = "app"
nodeID := "sidecar~" + ip + "~" + app + "." + namespace + "~" + namespace + ".svc.cluster.local"
bootstrap, err := grpcxds.GenerateBootstrap(grpcxds.GenerateBootstrapOptions{
Node: &model.Node{
ID: nodeID,
Metadata: &model.BootstrapNodeMetadata{
NodeMetadata: model.NodeMetadata{
Namespace: namespace,
Generator: "grpc",
ClusterID: "Kubernetes",
DiscoveryAddress: fmt.Sprintf("", xdsPort),
CertDir: path.Join(env.IstioSrc, "tests/testdata/certs/default"),
if err != nil {
return []byte{}
bootstrapBytes, err := json.Marshal(bootstrap)
if err != nil {
return []byte{}
return bootstrapBytes
// resolverForTest creates a resolver for xds:// names using dynamic bootstrap.
func resolverForTest(t test.Failer, xdsPort int, ns string) resolver.Builder {
xdsresolver, err := xdsgrpc.NewXDSResolverWithConfigForTesting(
GRPCBootstrap("foo", ns, "", xdsPort))
if err != nil {
return xdsresolver
func init() {
// Setup gRPC logging. Do it once in init to avoid races
o := log.DefaultOptions()
o.LogGrpc = true
func TestGRPC(t *testing.T) {
// Init Istiod in-process server.
ds := xds.NewXDS(make(chan struct{}))
sd := ds.DiscoveryServer.MemRegistry
sd.ClusterID = "Kubernetes"
lis, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("net.Listen failed: %v", err)
_, ports, _ := net.SplitHostPort(lis.Addr().String())
port, _ := strconv.Atoi(ports)
// Echo service
// initRBACTests(sd, store, "echo-rbac-plain", 14058, false)
initRBACTests(sd, ds.MemoryConfigStore, "echo-rbac-mtls", port, true)
xdsAddr, err := ds.StartGRPC("")
if err != nil {
defer ds.GRPCListener.Close()
_, xdsPorts, _ := net.SplitHostPort(xdsAddr)
xdsPort, _ := strconv.Atoi(xdsPorts)
addIstiod(sd, xdsPort)
env := ds.DiscoveryServer.Env
if err := env.PushContext.InitContext(env, env.PushContext, nil); err != nil {
// Client bootstrap - will show as "foo.clientns"
xdsresolver := resolverForTest(t, xdsPort, "clientns")
// Test the xdsresolver - query LDS and RDS for a specific service, wait for the update.
// Should be very fast (~20ms) and validate bootstrap and basic XDS connection.
// Unfortunately we have no way to look at the response except using the logs from XDS.
// This does not attempt to resolve CDS or EDS.
t.Run("gRPC-resolve", func(t *testing.T) {
rb := xdsresolver
stateCh := &Channel{ch: make(chan interface{}, 1)}
errorCh := &Channel{ch: make(chan interface{}, 1)}
_, err := rb.Build(resolver.Target{URL: url.URL{
Scheme: "xds",
Path: "/" + net.JoinHostPort(istiodSvcHost, xdsPorts),
&testClientConn{stateCh: stateCh, errorCh: errorCh}, resolver.BuildOptions{})
if err != nil {
t.Fatal("Failed to resolve XDS ", err)
tm := time.After(10 * time.Second)
select {
case s := <
t.Log("Got state ", s)
case e := <
t.Error("Error in resolve", e)
case <-tm:
t.Error("Didn't resolve in time")
t.Run("gRPC-svc", func(t *testing.T) {
t.Run("gRPC-svc-tls", func(t *testing.T) {
// Replaces: insecure.NewCredentials
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
if err != nil {
grpcOptions := []grpc.ServerOption{
bootstrapB := GRPCBootstrap("echo-rbac-mtls", "test", "", xdsPort)
grpcOptions = append(grpcOptions, xdsgrpc.BootstrapContentsForTesting(bootstrapB))
// Replaces: grpc NewServer
grpcServer := xdsgrpc.NewGRPCServer(grpcOptions...)
testRBAC(t, grpcServer, xdsresolver, "echo-rbac-mtls", port, lis)
t.Run("gRPC-dial", func(t *testing.T) {
for _, host := range []string{
} {
t.Run(host, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
conn, err := grpc.DialContext(ctx, "xds:///"+host+":"+xdsPorts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
if err != nil {
t.Fatal("XDS gRPC", err)
defer conn.Close()
s, err := discovery.NewAggregatedDiscoveryServiceClient(conn).StreamAggregatedResources(ctx)
if err != nil {
_ = s.Send(&discovery.DiscoveryRequest{})
func addIstiod(sd *memory.ServiceDiscovery, xdsPort int) {
Attributes: model.ServiceAttributes{
Name: "istiod",
Namespace: "dubbo-system",
Hostname: host.Name(istiodSvcHost),
DefaultAddress: "",
Ports: model.PortList{
Name: "grpc-main",
Port: xdsPort,
Protocol: protocol.GRPC, // SetEndpoints hardcodes this
sd.SetEndpoints(istiodSvcHost, "dubbo-system", []*model.IstioEndpoint{
Address: "",
EndpointPort: uint32(xdsPort),
ServicePortName: "grpc-main",
func initRBACTests(sd *memory.ServiceDiscovery, store model.ConfigStore, svcname string, port int, mtls bool) {
ns := "test"
hn := svcname + "." + ns + ".svc.cluster.local"
// The 'memory' store GetProxyServiceInstances uses the IP address of the node and endpoints to
// identify the service. In k8s store, labels are matched instead.
// For server configs to work, the server XDS bootstrap must match the IP.
// Required: namespace (otherwise DR matching fails)
Attributes: model.ServiceAttributes{
Name: svcname,
Namespace: ns,
Hostname: host.Name(hn),
DefaultAddress: "",
Ports: model.PortList{
Name: "grpc-main",
Port: port,
Protocol: protocol.GRPC,
// The address will be matched against the INSTANCE_IPS and id in the node id. If they match, the service is returned.
sd.SetEndpoints(hn, ns, []*model.IstioEndpoint{
Address: "",
EndpointPort: uint32(port),
ServicePortName: "grpc-main",
Meta: config.Meta{
GroupVersionKind: collections.IstioSecurityV1Beta1Authorizationpolicies.Resource().GroupVersionKind(),
Name: svcname,
Namespace: ns,
Spec: &security.AuthorizationPolicy{
Rules: []*security.Rule{
When: []*security.Condition{
Key: "request.headers[echo]",
Values: []string{
Action: security.AuthorizationPolicy_DENY,
Meta: config.Meta{
GroupVersionKind: collections.IstioSecurityV1Beta1Authorizationpolicies.Resource().GroupVersionKind(),
Name: svcname + "-allow",
Namespace: ns,
Spec: &security.AuthorizationPolicy{
Rules: []*security.Rule{
When: []*security.Condition{
Key: "request.headers[echo]",
Values: []string{
Action: security.AuthorizationPolicy_ALLOW,
if mtls {
// Client side.
_, _ = store.Create(config.Config{
Meta: config.Meta{
GroupVersionKind: collections.IstioNetworkingV1Alpha3Destinationrules.Resource().GroupVersionKind(),
Name: svcname,
Namespace: "test",
Spec: &networking.DestinationRule{
Host: svcname + ".test.svc.cluster.local",
TrafficPolicy: &networking.TrafficPolicy{Tls: &networking.ClientTLSSettings{
Mode: networking.ClientTLSSettings_ISTIO_MUTUAL,
// Server side.
_, _ = store.Create(config.Config{
Meta: config.Meta{
GroupVersionKind: collections.IstioSecurityV1Beta1Peerauthentications.Resource().GroupVersionKind(),
Name: svcname,
Namespace: "test",
Spec: &security.PeerAuthentication{
Mtls: &security.PeerAuthentication_MutualTLS{Mode: security.PeerAuthentication_MutualTLS_STRICT},
_, _ = store.Create(config.Config{
Meta: config.Meta{
GroupVersionKind: collections.IstioSecurityV1Beta1Authorizationpolicies.Resource().GroupVersionKind(),
Name: svcname,
Namespace: "test",
Spec: &security.AuthorizationPolicy{
Rules: []*security.Rule{
From: []*security.Rule_From{
Source: &security.Source{
Principals: []string{"evie"},
Action: security.AuthorizationPolicy_DENY,
func testRBAC(t *testing.T, grpcServer *xdsgrpc.GRPCServer, xdsresolver resolver.Builder, svcname string, port int, lis net.Listener) {
echos := &endpoint.EchoGrpcHandler{Config: endpoint.Config{Port: &common.Port{Port: port}}}
echoproto.RegisterEchoTestServiceServer(grpcServer, echos)
go func() {
err := grpcServer.Serve(lis)
if err != nil {
time.Sleep(3 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
creds, _ := xdscreds.NewClientCredentials(xdscreds.ClientOptions{
FallbackCreds: insecure.NewCredentials(),
conn, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s.test.svc.cluster.local:%d", svcname, port),
if err != nil {
t.Fatal("XDS gRPC", err)
defer conn.Close()
echoc := echoproto.NewEchoTestServiceClient(conn)
md := metadata.New(map[string]string{"echo": "block"})
outctx := metadata.NewOutgoingContext(context.Background(), md)
_, err = echoc.Echo(outctx, &echoproto.EchoRequest{})
if err == nil {
t.Fatal("RBAC rule not enforced")
if status.Code(err) != codes.PermissionDenied {
t.Fatal("Unexpected error", err)
type Channel struct {
ch chan interface{}
// Send sends value on the underlying channel.
func (c *Channel) Send(value interface{}) { <- value
// From xds_resolver_test
// testClientConn is a fake implemetation of resolver.ClientConn. All is does
// is to store the state received from the resolver locally and signal that
// event through a channel.
type testClientConn struct {
stateCh *Channel
errorCh *Channel
func (t *testClientConn) UpdateState(s resolver.State) error {
return nil
func (t *testClientConn) ReportError(err error) {
func (t *testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult {
// Will be called with something like:
// {"loadBalancingConfig":[
// {"xds_cluster_manager_experimental":{
// "children":{
// "cluster:outbound|14057||istiod.dubbo-system.svc.cluster.local":{
// "childPolicy":[
// {"cds_experimental":
// {"cluster":"outbound|14057||istiod.dubbo-system.svc.cluster.local"}}]}}}}]}
return &serviceconfig.ParseResult{}