| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package storage_test |
| |
| import ( |
| "context" |
| "fmt" |
| "io" |
| "reflect" |
| "testing" |
| "time" |
| |
| "github.com/apache/dubbo-kubernetes/api/dds" |
| dubboapacheorgv1alpha1 "github.com/apache/dubbo-kubernetes/api/resource/v1alpha1" |
| dubbocp "github.com/apache/dubbo-kubernetes/pkg/config/app/dubbo-cp" |
| "github.com/apache/dubbo-kubernetes/pkg/core/endpoint" |
| "github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client" |
| "github.com/apache/dubbo-kubernetes/pkg/core/model" |
| "github.com/apache/dubbo-kubernetes/pkg/core/schema/collection" |
| "github.com/apache/dubbo-kubernetes/pkg/core/schema/collections" |
| "github.com/apache/dubbo-kubernetes/pkg/core/schema/gvk" |
| "github.com/apache/dubbo-kubernetes/pkg/dds/kube/crdclient" |
| "github.com/apache/dubbo-kubernetes/pkg/dds/storage" |
| "github.com/apache/dubbo-kubernetes/test/util/retry" |
| "github.com/stretchr/testify/assert" |
| "google.golang.org/protobuf/types/known/anypb" |
| v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/client-go/tools/cache" |
| ) |
| |
| type fakeConnection struct { |
| sends []*dds.ObserveResponse |
| recvChan chan recvResult |
| disconnected bool |
| } |
| |
| type recvResult struct { |
| request *dds.ObserveRequest |
| err error |
| } |
| |
| func (f *fakeConnection) Send(targetRule *storage.VersionedRule, cr *storage.ClientStatus, response *dds.ObserveResponse) error { |
| cr.LastPushedTime = time.Now().Unix() |
| cr.LastPushedVersion = targetRule |
| cr.LastPushNonce = response.Nonce |
| cr.PushingStatus = storage.Pushing |
| f.sends = append(f.sends, response) |
| return nil |
| } |
| |
| func (f *fakeConnection) Recv() (*dds.ObserveRequest, error) { |
| request := <-f.recvChan |
| |
| return request.request, request.err |
| } |
| |
| func (f *fakeConnection) Disconnect() { |
| f.disconnected = true |
| } |
| |
| func TestStorage_CloseEOF(t *testing.T) { |
| t.Parallel() |
| |
| s := storage.NewStorage(&dubbocp.Config{}) |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "test", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: io.EOF, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(s.Connection) != 0 { |
| t.Error("expected storage to be removed") |
| } |
| } |
| |
| func TestStorage_CloseErr(t *testing.T) { |
| t.Parallel() |
| |
| s := storage.NewStorage(&dubbocp.Config{}) |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "test", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: fmt.Errorf("test"), |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(s.Connection) != 0 { |
| t.Error("expected storage to be removed") |
| } |
| } |
| |
| func TestStorage_UnknowType(t *testing.T) { |
| t.Parallel() |
| |
| s := storage.NewStorage(&dubbocp.Config{}) |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "test", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "", |
| Type: "test", |
| }, |
| err: nil, |
| } |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "", |
| Type: "", |
| }, |
| err: nil, |
| } |
| |
| conn := s.Connection[0] |
| |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: io.EOF, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(conn.TypeListened) != 0 { |
| t.Error("expected no type listened") |
| } |
| } |
| |
| func TestStorage_StartNonEmptyNonce(t *testing.T) { |
| t.Parallel() |
| |
| s := storage.NewStorage(&dubbocp.Config{}) |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "test", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "test", |
| Type: gvk.AuthenticationPolicy, |
| }, |
| err: nil, |
| } |
| |
| conn := s.Connection[0] |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: io.EOF, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(conn.TypeListened) != 0 { |
| t.Error("expected no type listened") |
| } |
| } |
| |
| func TestStorage_Listen(t *testing.T) { |
| t.Parallel() |
| |
| s := storage.NewStorage(&dubbocp.Config{}) |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "test", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "", |
| Type: gvk.AuthorizationPolicy, |
| }, |
| err: nil, |
| } |
| |
| conn := s.Connection[0] |
| |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: io.EOF, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(conn.TypeListened) == 0 { |
| t.Error("expected type listened") |
| } |
| |
| if !conn.TypeListened[gvk.AuthorizationPolicy] { |
| t.Error("expected type listened") |
| } |
| } |
| |
| func makeClient(t *testing.T, schemas collection.Schemas) crdclient.ConfigStoreCache { |
| fake := client.NewFakeClient() |
| for _, s := range schemas.All() { |
| _, err := fake.Ext().ApiextensionsV1().CustomResourceDefinitions().Create(context.TODO(), &v1.CustomResourceDefinition{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group()), |
| }, |
| }, metav1.CreateOptions{}) |
| if err != nil { |
| return nil |
| } |
| } |
| stop := make(chan struct{}) |
| config, err := crdclient.New(fake, "") |
| if err != nil { |
| t.Fatal(err) |
| } |
| go func() { |
| err := config.Start(stop) |
| if err != nil { |
| t.Error(err) |
| return |
| } |
| }() |
| _ = fake.Start(stop) |
| cache.WaitForCacheSync(stop, config.HasSynced) |
| t.Cleanup(func() { |
| close(stop) |
| }) |
| return config |
| } |
| |
| func TestStorage_PreNotify(t *testing.T) { |
| t.Parallel() |
| |
| store := makeClient(t, collections.Rule) |
| configName := "name" |
| configNamespace := "namespace" |
| timeout := retry.Timeout(time.Second * 20) |
| for _, c := range collections.Rule.All() { |
| name := c.Resource().Kind() |
| t.Run(name, func(t *testing.T) { |
| r := c.Resource() |
| configMeta := model.Meta{ |
| GroupVersionKind: r.GroupVersionKind(), |
| Name: configName, |
| } |
| if !r.IsClusterScoped() { |
| configMeta.Namespace = configNamespace |
| } |
| |
| pb, err := r.NewInstance() |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| if _, err := store.Create(model.Config{ |
| Meta: configMeta, |
| Spec: pb, |
| }); err != nil { |
| t.Fatalf("Create(%v) => got %v", name, err) |
| } |
| // Kubernetes is eventually consistent, so we allow a short time to pass before we get |
| retry.UntilSuccessOrFail(t, func() error { |
| cfg := store.Get(r.GroupVersionKind(), configName, configMeta.Namespace) |
| if cfg == nil || !reflect.DeepEqual(cfg.Meta, configMeta) { |
| return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg) |
| } |
| return nil |
| }, timeout) |
| s := storage.NewStorage(&dubbocp.Config{}) |
| |
| handler := crdclient.NewHandler(s, "dubbo-demo", store) |
| err = handler.NotifyWithIndex(c) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "test", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "", |
| Type: c.Resource().GroupVersionKind().String(), |
| }, |
| err: nil, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return len(fake.sends) == 1 |
| }, 10*time.Second, time.Millisecond) |
| |
| if fake.sends[0].Type != c.Resource().GroupVersionKind().String() { |
| t.Error("expected rule type") |
| } |
| |
| if fake.sends[0].Nonce == "" { |
| t.Error("expected non empty nonce") |
| } |
| |
| if fake.sends[0].Data == nil { |
| t.Error("expected data") |
| } |
| |
| if fake.sends[0].Revision != 1 { |
| t.Error("expected Rev 1") |
| } |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: fake.sends[0].Nonce, |
| Type: c.Resource().GroupVersionKind().String(), |
| }, |
| err: nil, |
| } |
| |
| conn := s.Connection[0] |
| |
| assert.Eventually(t, func() bool { |
| return conn.ClientRules[c.Resource().GroupVersionKind().String()].PushingStatus == storage.Pushed |
| }, 10*time.Second, time.Millisecond) |
| |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: io.EOF, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(conn.TypeListened) == 0 { |
| t.Error("expected type listened") |
| } |
| |
| if !conn.TypeListened[c.Resource().GroupVersionKind().String()] { |
| t.Error("expected type listened") |
| } |
| }) |
| } |
| } |
| |
| func TestStorage_AfterNotify(t *testing.T) { |
| t.Parallel() |
| |
| store := makeClient(t, collections.Rule) |
| configName := "name" |
| configNamespace := "namespace" |
| timeout := retry.Timeout(time.Second * 20) |
| for _, c := range collections.Rule.All() { |
| name := c.Resource().Kind() |
| t.Run(name, func(t *testing.T) { |
| r := c.Resource() |
| configMeta := model.Meta{ |
| GroupVersionKind: r.GroupVersionKind(), |
| Name: configName, |
| } |
| if !r.IsClusterScoped() { |
| configMeta.Namespace = configNamespace |
| } |
| |
| pb, err := r.NewInstance() |
| if err != nil { |
| t.Fatal(err) |
| } |
| if r.GroupVersionKind().String() == gvk.ServiceNameMapping { |
| mapping := pb.(*dubboapacheorgv1alpha1.ServiceNameMapping) |
| mapping.InterfaceName = "test" |
| mapping.ApplicationNames = []string{ |
| "test-app", |
| } |
| } |
| if _, err := store.Create(model.Config{ |
| Meta: configMeta, |
| Spec: pb, |
| }); err != nil { |
| t.Fatalf("Create(%v) => got %v", name, err) |
| } |
| // Kubernetes is eventually consistent, so we allow a short time to pass before we get |
| retry.UntilSuccessOrFail(t, func() error { |
| cfg := store.Get(r.GroupVersionKind(), configName, configMeta.Namespace) |
| if cfg == nil || !reflect.DeepEqual(cfg.Meta, configMeta) { |
| return fmt.Errorf("get(%v) => got unexpected object %v", name, cfg) |
| } |
| return nil |
| }, timeout) |
| s := storage.NewStorage(&dubbocp.Config{}) |
| handler := crdclient.NewHandler(s, "dubbo-demo", store) |
| |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "test", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "", |
| Type: c.Resource().GroupVersionKind().String(), |
| }, |
| err: nil, |
| } |
| |
| conn := s.Connection[0] |
| |
| assert.Eventually(t, func() bool { |
| return conn.TypeListened[c.Resource().GroupVersionKind().String()] |
| }, 10*time.Second, time.Millisecond) |
| |
| err = handler.NotifyWithIndex(c) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| assert.Eventually(t, func() bool { |
| return len(fake.sends) == 1 |
| }, 10*time.Second, time.Millisecond) |
| |
| if fake.sends[0].Type != c.Resource().GroupVersionKind().String() { |
| t.Error("expected rule type") |
| } |
| |
| if fake.sends[0].Nonce == "" { |
| t.Error("expected non empty nonce") |
| } |
| |
| if fake.sends[0].Data == nil { |
| t.Error("expected data") |
| } |
| |
| if fake.sends[0].Revision != 1 { |
| t.Error("expected Rev 1") |
| } |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: fake.sends[0].Nonce, |
| Type: c.Resource().GroupVersionKind().String(), |
| }, |
| err: nil, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return conn.ClientRules[c.Resource().GroupVersionKind().String()].PushingStatus == storage.Pushed |
| }, 10*time.Second, time.Millisecond) |
| |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: io.EOF, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(conn.TypeListened) == 0 { |
| t.Error("expected type listened") |
| } |
| |
| if !conn.TypeListened[c.Resource().GroupVersionKind().String()] { |
| t.Error("expected type listened") |
| } |
| }) |
| } |
| } |
| |
| func TestStore_MissNotify(t *testing.T) { |
| t.Parallel() |
| |
| store := makeClient(t, collections.Rule) |
| configName := "name" |
| configNamespace := "namespace" |
| collection.NewSchemasBuilder().MustAdd(collections.DubboApacheOrgV1Alpha1TagRoute).Build() |
| tag := collections.DubboApacheOrgV1Alpha1TagRoute.Resource() |
| collection.NewSchemasBuilder().MustAdd(collections.DubboApacheOrgV1Alpha1ConditionRoute).Build() |
| condition := collections.DubboApacheOrgV1Alpha1ConditionRoute.Resource() |
| tagconfigMeta := model.Meta{ |
| GroupVersionKind: tag.GroupVersionKind(), |
| Name: configName, |
| } |
| conditionConfigMeta := model.Meta{ |
| GroupVersionKind: condition.GroupVersionKind(), |
| Name: configName, |
| } |
| if !tag.IsClusterScoped() { |
| tagconfigMeta.Namespace = configNamespace |
| } |
| |
| tagpb, err := tag.NewInstance() |
| if err != nil { |
| t.Fatal(err) |
| } |
| conditionpb, err := condition.NewInstance() |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| if _, err := store.Create(model.Config{ |
| Meta: conditionConfigMeta, |
| Spec: conditionpb, |
| }); err != nil { |
| t.Fatalf("Create(%v) => got %v", condition.Kind(), err) |
| } |
| |
| if _, err := store.Create(model.Config{ |
| Meta: tagconfigMeta, |
| Spec: tagpb, |
| }); err != nil { |
| t.Fatalf("Create(%v) => got %v", tag.Kind(), err) |
| } |
| |
| s := storage.NewStorage(&dubbocp.Config{}) |
| tagHanlder := crdclient.NewHandler(s, "dubbo-demo", store) |
| conditionHandler := crdclient.NewHandler(s, "dubbo-demo", store) |
| |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "test", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "", |
| Type: condition.GroupVersionKind().String(), |
| }, |
| err: nil, |
| } |
| |
| conn := s.Connection[0] |
| |
| assert.Eventually(t, func() bool { |
| return conn.TypeListened[condition.GroupVersionKind().String()] |
| }, 10*time.Second, time.Millisecond) |
| |
| if err := conditionHandler.NotifyWithIndex(collections.DubboApacheOrgV1Alpha1ConditionRoute); err != nil { |
| t.Fatal(err) |
| } |
| if err := tagHanlder.NotifyWithIndex(collections.DubboApacheOrgV1Alpha1TagRoute); err != nil { |
| t.Fatal(err) |
| } |
| |
| assert.Eventually(t, func() bool { |
| return len(fake.sends) == 1 |
| }, 10*time.Second, time.Millisecond) |
| |
| if fake.sends[0].Type != condition.GroupVersionKind().String() { |
| t.Error("expected rule type") |
| } |
| |
| if fake.sends[0].Nonce == "" { |
| t.Error("expected non empty nonce") |
| } |
| |
| if fake.sends[0].Data == nil { |
| t.Error("expected data") |
| } |
| |
| if fake.sends[0].Revision != 1 { |
| t.Error("expected Rev 1") |
| } |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: fake.sends[0].Nonce, |
| Type: condition.GroupVersionKind().String(), |
| }, |
| err: nil, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return conn.ClientRules[condition.GroupVersionKind().String()].PushingStatus == storage.Pushed |
| }, 10*time.Second, time.Millisecond) |
| |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: io.EOF, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(conn.TypeListened) == 0 { |
| t.Error("expected type listened") |
| } |
| |
| if !conn.TypeListened[condition.GroupVersionKind().String()] { |
| t.Error("expected type listened") |
| } |
| |
| if len(fake.sends) != 1 { |
| t.Error("expected 1 send") |
| } |
| } |
| |
| type fakeOrigin struct { |
| hash int |
| } |
| |
| func (f *fakeOrigin) Type() string { |
| return gvk.TagRoute |
| } |
| |
| func (f *fakeOrigin) Revision() int64 { |
| return 1 |
| } |
| |
| func (f *fakeOrigin) Exact(gen map[string]storage.DdsResourceGenerator, endpoint *endpoint.Endpoint) (*storage.VersionedRule, error) { |
| return &storage.VersionedRule{ |
| Type: gvk.TagRoute, |
| Revision: 1, |
| Data: []*anypb.Any{}, |
| }, nil |
| } |
| |
| type errOrigin struct{} |
| |
| func (e errOrigin) Type() string { |
| return gvk.TagRoute |
| } |
| |
| func (e errOrigin) Revision() int64 { |
| return 1 |
| } |
| |
| func (e errOrigin) Exact(gen map[string]storage.DdsResourceGenerator, endpoint *endpoint.Endpoint) (*storage.VersionedRule, error) { |
| return nil, fmt.Errorf("test") |
| } |
| |
| func TestStorage_MulitiNotify(t *testing.T) { |
| t.Parallel() |
| |
| s := storage.NewStorage(&dubbocp.Config{}) |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "test", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "", |
| Type: gvk.TagRoute, |
| }, |
| err: nil, |
| } |
| |
| conn := s.Connection[0] |
| |
| assert.Eventually(t, func() bool { |
| return conn.TypeListened[gvk.TagRoute] |
| }, 10*time.Second, time.Millisecond) |
| |
| // should err |
| conn.RawRuleQueue.Add(&errOrigin{}) |
| conn.RawRuleQueue.Add(&fakeOrigin{ |
| hash: 1, |
| }) |
| conn.RawRuleQueue.Add(&fakeOrigin{ |
| hash: 2, |
| }) |
| conn.RawRuleQueue.Add(&fakeOrigin{ |
| hash: 3, |
| }) |
| |
| assert.Eventually(t, func() bool { |
| return len(fake.sends) == 1 |
| }, 10*time.Second, time.Millisecond) |
| |
| if fake.sends[0].Type != gvk.TagRoute { |
| t.Error("expected rule type") |
| } |
| |
| if fake.sends[0].Nonce == "" { |
| t.Error("expected non empty nonce") |
| } |
| |
| if fake.sends[0].Data == nil { |
| t.Error("expected data") |
| } |
| |
| assert.Eventually(t, func() bool { |
| return conn.ClientRules[gvk.TagRoute].PushQueued |
| }, 10*time.Second, time.Millisecond) |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: fake.sends[0].Nonce, |
| Type: gvk.TagRoute, |
| }, |
| err: nil, |
| } |
| assert.Eventually(t, func() bool { |
| return conn.ClientRules[gvk.TagRoute].PushingStatus == storage.Pushed |
| }, 10*time.Second, time.Millisecond) |
| |
| assert.Eventually(t, func() bool { |
| return conn.RawRuleQueue.Len() == 0 |
| }, 10*time.Second, time.Millisecond) |
| |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: io.EOF, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(conn.TypeListened) == 0 { |
| t.Error("expected type listened") |
| } |
| |
| if !conn.TypeListened[gvk.TagRoute] { |
| t.Error("expected type listened") |
| } |
| |
| if len(fake.sends) != 1 { |
| t.Error("expected 1 send") |
| } |
| } |
| |
| func TestStorage_Exact(t *testing.T) { |
| t.Parallel() |
| |
| configName := "name" |
| configNamespace := "namespace" |
| for _, c := range collections.Rule.All() { |
| r := c.Resource() |
| name := c.Resource().Kind() |
| t.Run(name, func(t *testing.T) { |
| configMeta := model.Meta{ |
| Name: configName, |
| Namespace: configNamespace, |
| GroupVersionKind: r.GroupVersionKind(), |
| } |
| |
| if !r.IsClusterScoped() { |
| configMeta.Namespace = configNamespace |
| } |
| |
| pb, err := r.NewInstance() |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| if r.GroupVersionKind().String() == gvk.TagRoute { |
| route := pb.(*dubboapacheorgv1alpha1.TagRoute) |
| route.Key = "test-key" |
| route.Tags = []*dubboapacheorgv1alpha1.Tag{ |
| { |
| Name: "zyq", |
| Addresses: []string{ |
| "lxy", |
| }, |
| }, |
| } |
| } |
| |
| origin := &storage.OriginImpl{ |
| Gvk: r.GroupVersionKind().String(), |
| Rev: 1, |
| Data: []model.Config{ |
| { |
| Meta: configMeta, |
| Spec: pb, |
| }, |
| }, |
| } |
| |
| gen := map[string]storage.DdsResourceGenerator{} |
| gen[gvk.AuthenticationPolicy] = &storage.AuthenticationGenerator{} |
| gen[gvk.AuthorizationPolicy] = &storage.AuthorizationGenerator{} |
| gen[gvk.ServiceNameMapping] = &storage.ServiceMappingGenerator{} |
| gen[gvk.ConditionRoute] = &storage.ConditionRoutesGenerator{} |
| gen[gvk.TagRoute] = &storage.TagRoutesGenerator{} |
| gen[gvk.DynamicConfig] = &storage.DynamicConfigsGenerator{} |
| generated, err := origin.Exact(gen, &endpoint.Endpoint{}) |
| assert.Nil(t, err) |
| |
| assert.NotNil(t, generated) |
| assert.Equal(t, generated.Type, r.GroupVersionKind().String()) |
| assert.Equal(t, generated.Revision, int64(1)) |
| }) |
| } |
| } |
| |
| func TestStorage_ReturnMisNonce(t *testing.T) { |
| t.Parallel() |
| |
| store := makeClient(t, collections.Rule) |
| configName := "name" |
| configNamespace := "namespace" |
| collection.NewSchemasBuilder().MustAdd(collections.DubboApacheOrgV1Alpha1TagRoute).Build() |
| tag := collections.DubboApacheOrgV1Alpha1TagRoute.Resource() |
| tagconfigMeta := model.Meta{ |
| GroupVersionKind: tag.GroupVersionKind(), |
| Name: configName, |
| } |
| |
| if !tag.IsClusterScoped() { |
| tagconfigMeta.Namespace = configNamespace |
| } |
| |
| tagpb, err := tag.NewInstance() |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| if _, err := store.Create(model.Config{ |
| Meta: tagconfigMeta, |
| Spec: tagpb, |
| }); err != nil { |
| t.Fatalf("Create(%v) => got %v", tag.Kind(), err) |
| } |
| |
| s := storage.NewStorage(&dubbocp.Config{}) |
| tagHanlder := crdclient.NewHandler(s, "dubbo-system", store) |
| err = tagHanlder.NotifyWithIndex(collections.DubboApacheOrgV1Alpha1TagRoute) |
| if err != nil { |
| t.Fatal(err) |
| } |
| fake := &fakeConnection{ |
| recvChan: make(chan recvResult, 1), |
| } |
| |
| s.Connected(&endpoint.Endpoint{ |
| ID: "TEST", |
| }, fake) |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "", |
| Type: gvk.TagRoute, |
| }, |
| err: nil, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return len(fake.sends) == 1 |
| }, 10*time.Second, time.Millisecond) |
| |
| if fake.sends[0].Type != gvk.TagRoute { |
| t.Error("expected rule type") |
| } |
| if fake.sends[0].Nonce == "" { |
| t.Error("expected non empty nonce") |
| } |
| |
| if fake.sends[0].Data == nil { |
| t.Error("expected data") |
| } |
| if fake.sends[0].Revision != 1 { |
| t.Error("expected revision 1") |
| } |
| |
| fake.recvChan <- recvResult{ |
| request: &dds.ObserveRequest{ |
| Nonce: "test", |
| Type: gvk.TagRoute, |
| }, |
| err: nil, |
| } |
| |
| conn := s.Connection[0] |
| |
| fake.recvChan <- recvResult{ |
| request: nil, |
| err: io.EOF, |
| } |
| |
| assert.Eventually(t, func() bool { |
| return fake.disconnected |
| }, 10*time.Second, time.Millisecond) |
| |
| if len(conn.TypeListened) == 0 { |
| t.Error("expected type listened") |
| } |
| |
| if !conn.TypeListened[gvk.TagRoute] { |
| t.Error("expected type listened") |
| } |
| |
| if conn.ClientRules[gvk.TagRoute].PushingStatus == storage.Pushed { |
| t.Error("expected not pushed") |
| } |
| } |