blob: d550a3e096361ce1ec56a18a694be1e0cc1750ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storage_test
import (
"context"
"fmt"
"io"
"reflect"
"testing"
"time"
"github.com/apache/dubbo-kubernetes/api/dds"
dubboapacheorgv1alpha1 "github.com/apache/dubbo-kubernetes/api/resource/v1alpha1"
dubbocp "github.com/apache/dubbo-kubernetes/pkg/config/app/dubbo-cp"
"github.com/apache/dubbo-kubernetes/pkg/core/endpoint"
"github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client"
"github.com/apache/dubbo-kubernetes/pkg/core/model"
"github.com/apache/dubbo-kubernetes/pkg/core/schema/collection"
"github.com/apache/dubbo-kubernetes/pkg/core/schema/collections"
"github.com/apache/dubbo-kubernetes/pkg/core/schema/gvk"
"github.com/apache/dubbo-kubernetes/pkg/dds/kube/crdclient"
"github.com/apache/dubbo-kubernetes/pkg/dds/storage"
"github.com/apache/dubbo-kubernetes/test/util/retry"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/anypb"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)
type fakeConnection struct {
sends []*dds.ObserveResponse
recvChan chan recvResult
disconnected bool
}
type recvResult struct {
request *dds.ObserveRequest
err error
}
func (f *fakeConnection) Send(targetRule *storage.VersionedRule, cr *storage.ClientStatus, response *dds.ObserveResponse) error {
cr.LastPushedTime = time.Now().Unix()
cr.LastPushedVersion = targetRule
cr.LastPushNonce = response.Nonce
cr.PushingStatus = storage.Pushing
f.sends = append(f.sends, response)
return nil
}
func (f *fakeConnection) Recv() (*dds.ObserveRequest, error) {
request := <-f.recvChan
return request.request, request.err
}
func (f *fakeConnection) Disconnect() {
f.disconnected = true
}
func TestStorage_CloseEOF(t *testing.T) {
t.Parallel()
s := storage.NewStorage(&dubbocp.Config{})
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "test",
}, fake)
fake.recvChan <- recvResult{
request: nil,
err: io.EOF,
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(s.Connection) != 0 {
t.Error("expected storage to be removed")
}
}
func TestStorage_CloseErr(t *testing.T) {
t.Parallel()
s := storage.NewStorage(&dubbocp.Config{})
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "test",
}, fake)
fake.recvChan <- recvResult{
request: nil,
err: fmt.Errorf("test"),
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(s.Connection) != 0 {
t.Error("expected storage to be removed")
}
}
func TestStorage_UnknowType(t *testing.T) {
t.Parallel()
s := storage.NewStorage(&dubbocp.Config{})
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "test",
}, fake)
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "",
Type: "test",
},
err: nil,
}
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "",
Type: "",
},
err: nil,
}
conn := s.Connection[0]
fake.recvChan <- recvResult{
request: nil,
err: io.EOF,
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(conn.TypeListened) != 0 {
t.Error("expected no type listened")
}
}
func TestStorage_StartNonEmptyNonce(t *testing.T) {
t.Parallel()
s := storage.NewStorage(&dubbocp.Config{})
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "test",
}, fake)
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "test",
Type: gvk.AuthenticationPolicy,
},
err: nil,
}
conn := s.Connection[0]
fake.recvChan <- recvResult{
request: nil,
err: io.EOF,
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(conn.TypeListened) != 0 {
t.Error("expected no type listened")
}
}
func TestStorage_Listen(t *testing.T) {
t.Parallel()
s := storage.NewStorage(&dubbocp.Config{})
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "test",
}, fake)
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "",
Type: gvk.AuthorizationPolicy,
},
err: nil,
}
conn := s.Connection[0]
fake.recvChan <- recvResult{
request: nil,
err: io.EOF,
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(conn.TypeListened) == 0 {
t.Error("expected type listened")
}
if !conn.TypeListened[gvk.AuthorizationPolicy] {
t.Error("expected type listened")
}
}
func makeClient(t *testing.T, schemas collection.Schemas) crdclient.ConfigStoreCache {
fake := client.NewFakeClient()
for _, s := range schemas.All() {
_, err := fake.Ext().ApiextensionsV1().CustomResourceDefinitions().Create(context.TODO(), &v1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group()),
},
}, metav1.CreateOptions{})
if err != nil {
return nil
}
}
stop := make(chan struct{})
config, err := crdclient.New(fake, "")
if err != nil {
t.Fatal(err)
}
go func() {
err := config.Start(stop)
if err != nil {
t.Error(err)
return
}
}()
_ = fake.Start(stop)
cache.WaitForCacheSync(stop, config.HasSynced)
t.Cleanup(func() {
close(stop)
})
return config
}
func TestStorage_PreNotify(t *testing.T) {
t.Parallel()
store := makeClient(t, collections.Rule)
configName := "name"
configNamespace := "namespace"
timeout := retry.Timeout(time.Second * 20)
for _, c := range collections.Rule.All() {
name := c.Resource().Kind()
t.Run(name, func(t *testing.T) {
r := c.Resource()
configMeta := model.Meta{
GroupVersionKind: r.GroupVersionKind(),
Name: configName,
}
if !r.IsClusterScoped() {
configMeta.Namespace = configNamespace
}
pb, err := r.NewInstance()
if err != nil {
t.Fatal(err)
}
if _, err := store.Create(model.Config{
Meta: configMeta,
Spec: pb,
}); err != nil {
t.Fatalf("Create(%v) => got %v", name, err)
}
// Kubernetes is eventually consistent, so we allow a short time to pass before we get
retry.UntilSuccessOrFail(t, func() error {
cfg := store.Get(r.GroupVersionKind(), configName, configMeta.Namespace)
if cfg == nil || !reflect.DeepEqual(cfg.Meta, configMeta) {
return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg)
}
return nil
}, timeout)
s := storage.NewStorage(&dubbocp.Config{})
handler := crdclient.NewHandler(s, "dubbo-demo", store)
err = handler.NotifyWithIndex(c)
if err != nil {
t.Fatal(err)
}
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "test",
}, fake)
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "",
Type: c.Resource().GroupVersionKind().String(),
},
err: nil,
}
assert.Eventually(t, func() bool {
return len(fake.sends) == 1
}, 10*time.Second, time.Millisecond)
if fake.sends[0].Type != c.Resource().GroupVersionKind().String() {
t.Error("expected rule type")
}
if fake.sends[0].Nonce == "" {
t.Error("expected non empty nonce")
}
if fake.sends[0].Data == nil {
t.Error("expected data")
}
if fake.sends[0].Revision != 1 {
t.Error("expected Rev 1")
}
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: fake.sends[0].Nonce,
Type: c.Resource().GroupVersionKind().String(),
},
err: nil,
}
conn := s.Connection[0]
assert.Eventually(t, func() bool {
return conn.ClientRules[c.Resource().GroupVersionKind().String()].PushingStatus == storage.Pushed
}, 10*time.Second, time.Millisecond)
fake.recvChan <- recvResult{
request: nil,
err: io.EOF,
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(conn.TypeListened) == 0 {
t.Error("expected type listened")
}
if !conn.TypeListened[c.Resource().GroupVersionKind().String()] {
t.Error("expected type listened")
}
})
}
}
func TestStorage_AfterNotify(t *testing.T) {
t.Parallel()
store := makeClient(t, collections.Rule)
configName := "name"
configNamespace := "namespace"
timeout := retry.Timeout(time.Second * 20)
for _, c := range collections.Rule.All() {
name := c.Resource().Kind()
t.Run(name, func(t *testing.T) {
r := c.Resource()
configMeta := model.Meta{
GroupVersionKind: r.GroupVersionKind(),
Name: configName,
}
if !r.IsClusterScoped() {
configMeta.Namespace = configNamespace
}
pb, err := r.NewInstance()
if err != nil {
t.Fatal(err)
}
if r.GroupVersionKind().String() == gvk.ServiceNameMapping {
mapping := pb.(*dubboapacheorgv1alpha1.ServiceNameMapping)
mapping.InterfaceName = "test"
mapping.ApplicationNames = []string{
"test-app",
}
}
if _, err := store.Create(model.Config{
Meta: configMeta,
Spec: pb,
}); err != nil {
t.Fatalf("Create(%v) => got %v", name, err)
}
// Kubernetes is eventually consistent, so we allow a short time to pass before we get
retry.UntilSuccessOrFail(t, func() error {
cfg := store.Get(r.GroupVersionKind(), configName, configMeta.Namespace)
if cfg == nil || !reflect.DeepEqual(cfg.Meta, configMeta) {
return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg)
}
return nil
}, timeout)
s := storage.NewStorage(&dubbocp.Config{})
handler := crdclient.NewHandler(s, "dubbo-demo", store)
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "test",
}, fake)
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "",
Type: c.Resource().GroupVersionKind().String(),
},
err: nil,
}
conn := s.Connection[0]
assert.Eventually(t, func() bool {
return conn.TypeListened[c.Resource().GroupVersionKind().String()]
}, 10*time.Second, time.Millisecond)
err = handler.NotifyWithIndex(c)
if err != nil {
t.Fatal(err)
}
assert.Eventually(t, func() bool {
return len(fake.sends) == 1
}, 10*time.Second, time.Millisecond)
if fake.sends[0].Type != c.Resource().GroupVersionKind().String() {
t.Error("expected rule type")
}
if fake.sends[0].Nonce == "" {
t.Error("expected non empty nonce")
}
if fake.sends[0].Data == nil {
t.Error("expected data")
}
if fake.sends[0].Revision != 1 {
t.Error("expected Rev 1")
}
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: fake.sends[0].Nonce,
Type: c.Resource().GroupVersionKind().String(),
},
err: nil,
}
assert.Eventually(t, func() bool {
return conn.ClientRules[c.Resource().GroupVersionKind().String()].PushingStatus == storage.Pushed
}, 10*time.Second, time.Millisecond)
fake.recvChan <- recvResult{
request: nil,
err: io.EOF,
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(conn.TypeListened) == 0 {
t.Error("expected type listened")
}
if !conn.TypeListened[c.Resource().GroupVersionKind().String()] {
t.Error("expected type listened")
}
})
}
}
func TestStore_MissNotify(t *testing.T) {
t.Parallel()
store := makeClient(t, collections.Rule)
configName := "name"
configNamespace := "namespace"
collection.NewSchemasBuilder().MustAdd(collections.DubboApacheOrgV1Alpha1TagRoute).Build()
tag := collections.DubboApacheOrgV1Alpha1TagRoute.Resource()
collection.NewSchemasBuilder().MustAdd(collections.DubboApacheOrgV1Alpha1ConditionRoute).Build()
condition := collections.DubboApacheOrgV1Alpha1ConditionRoute.Resource()
tagconfigMeta := model.Meta{
GroupVersionKind: tag.GroupVersionKind(),
Name: configName,
}
conditionConfigMeta := model.Meta{
GroupVersionKind: condition.GroupVersionKind(),
Name: configName,
}
if !tag.IsClusterScoped() {
tagconfigMeta.Namespace = configNamespace
}
tagpb, err := tag.NewInstance()
if err != nil {
t.Fatal(err)
}
conditionpb, err := condition.NewInstance()
if err != nil {
t.Fatal(err)
}
if _, err := store.Create(model.Config{
Meta: conditionConfigMeta,
Spec: conditionpb,
}); err != nil {
t.Fatalf("Create(%v) => got %v", condition.Kind(), err)
}
if _, err := store.Create(model.Config{
Meta: tagconfigMeta,
Spec: tagpb,
}); err != nil {
t.Fatalf("Create(%v) => got %v", tag.Kind(), err)
}
s := storage.NewStorage(&dubbocp.Config{})
tagHanlder := crdclient.NewHandler(s, "dubbo-demo", store)
conditionHandler := crdclient.NewHandler(s, "dubbo-demo", store)
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "test",
}, fake)
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "",
Type: condition.GroupVersionKind().String(),
},
err: nil,
}
conn := s.Connection[0]
assert.Eventually(t, func() bool {
return conn.TypeListened[condition.GroupVersionKind().String()]
}, 10*time.Second, time.Millisecond)
if err := conditionHandler.NotifyWithIndex(collections.DubboApacheOrgV1Alpha1ConditionRoute); err != nil {
t.Fatal(err)
}
if err := tagHanlder.NotifyWithIndex(collections.DubboApacheOrgV1Alpha1TagRoute); err != nil {
t.Fatal(err)
}
assert.Eventually(t, func() bool {
return len(fake.sends) == 1
}, 10*time.Second, time.Millisecond)
if fake.sends[0].Type != condition.GroupVersionKind().String() {
t.Error("expected rule type")
}
if fake.sends[0].Nonce == "" {
t.Error("expected non empty nonce")
}
if fake.sends[0].Data == nil {
t.Error("expected data")
}
if fake.sends[0].Revision != 1 {
t.Error("expected Rev 1")
}
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: fake.sends[0].Nonce,
Type: condition.GroupVersionKind().String(),
},
err: nil,
}
assert.Eventually(t, func() bool {
return conn.ClientRules[condition.GroupVersionKind().String()].PushingStatus == storage.Pushed
}, 10*time.Second, time.Millisecond)
fake.recvChan <- recvResult{
request: nil,
err: io.EOF,
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(conn.TypeListened) == 0 {
t.Error("expected type listened")
}
if !conn.TypeListened[condition.GroupVersionKind().String()] {
t.Error("expected type listened")
}
if len(fake.sends) != 1 {
t.Error("expected 1 send")
}
}
type fakeOrigin struct {
hash int
}
func (f *fakeOrigin) Type() string {
return gvk.TagRoute
}
func (f *fakeOrigin) Revision() int64 {
return 1
}
func (f *fakeOrigin) Exact(gen map[string]storage.DdsResourceGenerator, endpoint *endpoint.Endpoint) (*storage.VersionedRule, error) {
return &storage.VersionedRule{
Type: gvk.TagRoute,
Revision: 1,
Data: []*anypb.Any{},
}, nil
}
type errOrigin struct{}
func (e errOrigin) Type() string {
return gvk.TagRoute
}
func (e errOrigin) Revision() int64 {
return 1
}
func (e errOrigin) Exact(gen map[string]storage.DdsResourceGenerator, endpoint *endpoint.Endpoint) (*storage.VersionedRule, error) {
return nil, fmt.Errorf("test")
}
func TestStorage_MulitiNotify(t *testing.T) {
t.Parallel()
s := storage.NewStorage(&dubbocp.Config{})
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "test",
}, fake)
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "",
Type: gvk.TagRoute,
},
err: nil,
}
conn := s.Connection[0]
assert.Eventually(t, func() bool {
return conn.TypeListened[gvk.TagRoute]
}, 10*time.Second, time.Millisecond)
// should err
conn.RawRuleQueue.Add(&errOrigin{})
conn.RawRuleQueue.Add(&fakeOrigin{
hash: 1,
})
conn.RawRuleQueue.Add(&fakeOrigin{
hash: 2,
})
conn.RawRuleQueue.Add(&fakeOrigin{
hash: 3,
})
assert.Eventually(t, func() bool {
return len(fake.sends) == 1
}, 10*time.Second, time.Millisecond)
if fake.sends[0].Type != gvk.TagRoute {
t.Error("expected rule type")
}
if fake.sends[0].Nonce == "" {
t.Error("expected non empty nonce")
}
if fake.sends[0].Data == nil {
t.Error("expected data")
}
assert.Eventually(t, func() bool {
return conn.ClientRules[gvk.TagRoute].PushQueued
}, 10*time.Second, time.Millisecond)
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: fake.sends[0].Nonce,
Type: gvk.TagRoute,
},
err: nil,
}
assert.Eventually(t, func() bool {
return conn.ClientRules[gvk.TagRoute].PushingStatus == storage.Pushed
}, 10*time.Second, time.Millisecond)
assert.Eventually(t, func() bool {
return conn.RawRuleQueue.Len() == 0
}, 10*time.Second, time.Millisecond)
fake.recvChan <- recvResult{
request: nil,
err: io.EOF,
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(conn.TypeListened) == 0 {
t.Error("expected type listened")
}
if !conn.TypeListened[gvk.TagRoute] {
t.Error("expected type listened")
}
if len(fake.sends) != 1 {
t.Error("expected 1 send")
}
}
func TestStorage_Exact(t *testing.T) {
t.Parallel()
configName := "name"
configNamespace := "namespace"
for _, c := range collections.Rule.All() {
r := c.Resource()
name := c.Resource().Kind()
t.Run(name, func(t *testing.T) {
configMeta := model.Meta{
Name: configName,
Namespace: configNamespace,
GroupVersionKind: r.GroupVersionKind(),
}
if !r.IsClusterScoped() {
configMeta.Namespace = configNamespace
}
pb, err := r.NewInstance()
if err != nil {
t.Fatal(err)
}
if r.GroupVersionKind().String() == gvk.TagRoute {
route := pb.(*dubboapacheorgv1alpha1.TagRoute)
route.Key = "test-key"
route.Tags = []*dubboapacheorgv1alpha1.Tag{
{
Name: "zyq",
Addresses: []string{
"lxy",
},
},
}
}
origin := &storage.OriginImpl{
Gvk: r.GroupVersionKind().String(),
Rev: 1,
Data: []model.Config{
{
Meta: configMeta,
Spec: pb,
},
},
}
gen := map[string]storage.DdsResourceGenerator{}
gen[gvk.AuthenticationPolicy] = &storage.AuthenticationGenerator{}
gen[gvk.AuthorizationPolicy] = &storage.AuthorizationGenerator{}
gen[gvk.ServiceNameMapping] = &storage.ServiceMappingGenerator{}
gen[gvk.ConditionRoute] = &storage.ConditionRoutesGenerator{}
gen[gvk.TagRoute] = &storage.TagRoutesGenerator{}
gen[gvk.DynamicConfig] = &storage.DynamicConfigsGenerator{}
generated, err := origin.Exact(gen, &endpoint.Endpoint{})
assert.Nil(t, err)
assert.NotNil(t, generated)
assert.Equal(t, generated.Type, r.GroupVersionKind().String())
assert.Equal(t, generated.Revision, int64(1))
})
}
}
func TestStorage_ReturnMisNonce(t *testing.T) {
t.Parallel()
store := makeClient(t, collections.Rule)
configName := "name"
configNamespace := "namespace"
collection.NewSchemasBuilder().MustAdd(collections.DubboApacheOrgV1Alpha1TagRoute).Build()
tag := collections.DubboApacheOrgV1Alpha1TagRoute.Resource()
tagconfigMeta := model.Meta{
GroupVersionKind: tag.GroupVersionKind(),
Name: configName,
}
if !tag.IsClusterScoped() {
tagconfigMeta.Namespace = configNamespace
}
tagpb, err := tag.NewInstance()
if err != nil {
t.Fatal(err)
}
if _, err := store.Create(model.Config{
Meta: tagconfigMeta,
Spec: tagpb,
}); err != nil {
t.Fatalf("Create(%v) => got %v", tag.Kind(), err)
}
s := storage.NewStorage(&dubbocp.Config{})
tagHanlder := crdclient.NewHandler(s, "dubbo-system", store)
err = tagHanlder.NotifyWithIndex(collections.DubboApacheOrgV1Alpha1TagRoute)
if err != nil {
t.Fatal(err)
}
fake := &fakeConnection{
recvChan: make(chan recvResult, 1),
}
s.Connected(&endpoint.Endpoint{
ID: "TEST",
}, fake)
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "",
Type: gvk.TagRoute,
},
err: nil,
}
assert.Eventually(t, func() bool {
return len(fake.sends) == 1
}, 10*time.Second, time.Millisecond)
if fake.sends[0].Type != gvk.TagRoute {
t.Error("expected rule type")
}
if fake.sends[0].Nonce == "" {
t.Error("expected non empty nonce")
}
if fake.sends[0].Data == nil {
t.Error("expected data")
}
if fake.sends[0].Revision != 1 {
t.Error("expected revision 1")
}
fake.recvChan <- recvResult{
request: &dds.ObserveRequest{
Nonce: "test",
Type: gvk.TagRoute,
},
err: nil,
}
conn := s.Connection[0]
fake.recvChan <- recvResult{
request: nil,
err: io.EOF,
}
assert.Eventually(t, func() bool {
return fake.disconnected
}, 10*time.Second, time.Millisecond)
if len(conn.TypeListened) == 0 {
t.Error("expected type listened")
}
if !conn.TypeListened[gvk.TagRoute] {
t.Error("expected type listened")
}
if conn.ClientRules[gvk.TagRoute].PushingStatus == storage.Pushed {
t.Error("expected not pushed")
}
}