blob: 795e4a374b296b4eb6927b60c1560fd3b59be11b [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package schema
import (
"context"
"embed"
"errors"
"fmt"
"os"
"path"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
)
const indexRuleDir = "testdata/index_rules"
var (
//go:embed testdata/index_rules/*.json
indexRuleStore embed.FS
//go:embed testdata/index_rule_binding.json
indexRuleBindingJSON string
//go:embed testdata/stream.json
streamJSON string
//go:embed testdata/group.json
groupJSON string
_ EventHandler = (*mockedEventHandler)(nil)
)
type mockedEventHandler struct {
mock.Mock
}
func (m *mockedEventHandler) OnAddOrUpdate(metadata Metadata) {
m.Called(metadata)
}
func (m *mockedEventHandler) OnDelete(metadata Metadata) {
m.Called(metadata)
}
func preloadSchema(e Registry) error {
g := &commonv1.Group{}
if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
return err
}
if err := e.CreateGroup(context.TODO(), g); err != nil {
return err
}
s := &databasev1.Stream{}
if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
return err
}
err := e.CreateStream(context.Background(), s)
if err != nil {
return err
}
indexRuleBinding := &databasev1.IndexRuleBinding{}
if err = protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); err != nil {
return err
}
err = e.CreateIndexRuleBinding(context.Background(), indexRuleBinding)
if err != nil {
return err
}
entries, err := indexRuleStore.ReadDir(indexRuleDir)
if err != nil {
return err
}
for _, entry := range entries {
data, err := indexRuleStore.ReadFile(indexRuleDir + "/" + entry.Name())
if err != nil {
return err
}
var idxRule databasev1.IndexRule
err = protojson.Unmarshal(data, &idxRule)
if err != nil {
return err
}
err = e.CreateIndexRule(context.Background(), &idxRule)
if err != nil {
return err
}
}
return nil
}
func randomTempDir() string {
return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", uuid.New().String()))
}
func useRandomTempDir() RegistryOption {
return func(config *etcdSchemaRegistryConfig) {
config.rootDir = randomTempDir()
}
}
func useRandomPort() RegistryOption {
return func(config *etcdSchemaRegistryConfig) {
ports, err := test.AllocateFreePorts(2)
if err != nil {
panic("fail to find free ports")
}
config.listenerClientURL, config.listenerPeerURL = fmt.Sprintf("http://127.0.0.1:%d", ports[0]), fmt.Sprintf("http://127.0.0.1:%d", ports[1])
}
}
func Test_Etcd_Entity_Get(t *testing.T) {
tester := assert.New(t)
registry, err := NewEtcdSchemaRegistry(useRandomPort(), useRandomTempDir(), LoggerLevel("warn"))
tester.NoError(err)
tester.NotNil(registry)
defer registry.Close()
err = preloadSchema(registry)
tester.NoError(err)
tests := []struct {
name string
meta *commonv1.Metadata
get func(Registry, *commonv1.Metadata) (HasMetadata, error)
expectedErr bool
}{
{
name: "Get Group",
meta: &commonv1.Metadata{Name: "default"},
get: func(r Registry, meta *commonv1.Metadata) (HasMetadata, error) {
stm, innerErr := registry.GetGroup(context.TODO(), meta.GetName())
if innerErr != nil {
return nil, innerErr
}
return HasMetadata(stm), nil
},
},
{
name: "Get Stream",
meta: &commonv1.Metadata{Name: "sw", Group: "default"},
get: func(r Registry, meta *commonv1.Metadata) (HasMetadata, error) {
stm, innerErr := registry.GetStream(context.TODO(), meta)
if innerErr != nil {
return nil, innerErr
}
return HasMetadata(stm), nil
},
},
{
name: "Get IndexRuleBinding",
meta: &commonv1.Metadata{Name: "sw-index-rule-binding", Group: "default"},
get: func(r Registry, meta *commonv1.Metadata) (HasMetadata, error) {
e, innerErr := registry.GetIndexRuleBinding(context.TODO(), meta)
if innerErr != nil {
return nil, innerErr
}
return HasMetadata(e), nil
},
},
{
name: "Get IndexRule",
meta: &commonv1.Metadata{Name: "db.instance", Group: "default"},
get: func(r Registry, meta *commonv1.Metadata) (HasMetadata, error) {
e, innerErr := registry.GetIndexRule(context.TODO(), meta)
if innerErr != nil {
return nil, innerErr
}
return HasMetadata(e), nil
},
},
{
name: "Get unknown Measure",
meta: &commonv1.Metadata{Name: "unknown-stream", Group: "default"},
get: func(r Registry, meta *commonv1.Metadata) (HasMetadata, error) {
e, innerErr := registry.GetMeasure(context.TODO(), meta)
if innerErr != nil {
return nil, innerErr
}
return HasMetadata(e), nil
},
expectedErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := require.New(t)
entity, err := tt.get(registry, tt.meta)
if !tt.expectedErr {
req.NoError(err)
req.NotNil(entity)
req.Greater(entity.GetMetadata().GetCreateRevision(), int64(0))
req.Greater(entity.GetMetadata().GetModRevision(), int64(0))
req.Equal(entity.GetMetadata().GetGroup(), tt.meta.GetGroup())
req.Equal(entity.GetMetadata().GetName(), tt.meta.GetName())
} else {
req.Error(err)
}
})
}
}
func Test_Etcd_Entity_List(t *testing.T) {
tester := assert.New(t)
registry, err := NewEtcdSchemaRegistry(useRandomPort(), useRandomTempDir(), LoggerLevel("warn"))
tester.NoError(err)
tester.NotNil(registry)
defer registry.Close()
err = preloadSchema(registry)
tester.NoError(err)
tests := []struct {
name string
list func(Registry) (int, error)
expectedLen int
}{
{
name: "List Group",
list: func(r Registry) (int, error) {
entities, innerErr := r.ListGroup(context.TODO())
if innerErr != nil {
return 0, innerErr
}
return len(entities), nil
},
expectedLen: 1,
},
{
name: "List Stream",
list: func(r Registry) (int, error) {
entities, innerErr := r.ListStream(context.TODO(), ListOpt{Group: "default"})
if innerErr != nil {
return 0, innerErr
}
return len(entities), nil
},
expectedLen: 1,
},
{
name: "List IndexRuleBinding",
list: func(r Registry) (int, error) {
entities, innerErr := r.ListIndexRuleBinding(context.TODO(), ListOpt{Group: "default"})
if innerErr != nil {
return 0, innerErr
}
return len(entities), nil
},
expectedLen: 1,
},
{
name: "List IndexRule",
list: func(r Registry) (int, error) {
entities, innerErr := r.ListIndexRule(context.TODO(), ListOpt{Group: "default"})
if innerErr != nil {
return 0, innerErr
}
return len(entities), nil
},
expectedLen: 10,
},
{
name: "List Measure",
list: func(r Registry) (int, error) {
entities, innerErr := r.ListMeasure(context.TODO(), ListOpt{Group: "default"})
if innerErr != nil {
return 0, innerErr
}
return len(entities), nil
},
expectedLen: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := require.New(t)
entitiesLen, listErr := tt.list(registry)
req.NoError(listErr)
req.Equal(entitiesLen, tt.expectedLen)
})
}
}
func Test_Etcd_Delete(t *testing.T) {
tester := assert.New(t)
registry, err := NewEtcdSchemaRegistry(useRandomPort(), useRandomTempDir(), LoggerLevel("warn"))
tester.NoError(err)
tester.NotNil(registry)
defer registry.Close()
err = preloadSchema(registry)
tester.NoError(err)
tests := []struct {
name string
list func(Registry) (int, error)
delete func(Registry) error
expectedLenBefore int
expectedLenAfter int
}{
{
name: "Delete IndexRule",
list: func(r Registry) (int, error) {
entities, innerErr := r.ListIndexRule(context.TODO(), ListOpt{Group: "default"})
if innerErr != nil {
return 0, innerErr
}
return len(entities), nil
},
delete: func(r Registry) error {
_, innerErr := r.DeleteIndexRule(context.TODO(), &commonv1.Metadata{
Name: "db.instance",
Group: "default",
})
return innerErr
},
expectedLenBefore: 10,
expectedLenAfter: 9,
},
{
name: "Delete Group",
list: func(r Registry) (int, error) {
entities, innerErr := r.ListIndexRule(context.TODO(), ListOpt{Group: "default"})
if innerErr != nil {
return 0, innerErr
}
return len(entities), nil
},
delete: func(r Registry) error {
_, innerErr := r.DeleteGroup(context.TODO(), "default")
return innerErr
},
expectedLenBefore: 9,
expectedLenAfter: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ast := assert.New(t)
num, err := tt.list(registry)
ast.NoError(err)
ast.Equal(num, tt.expectedLenBefore)
err = tt.delete(registry)
ast.NoError(err)
num, err = tt.list(registry)
ast.NoError(err)
ast.Equal(num, tt.expectedLenAfter)
})
}
}
func Test_Notify(t *testing.T) {
req := require.New(t)
registry, err := NewEtcdSchemaRegistry(useRandomPort(), useRandomTempDir(), LoggerLevel("warn"))
req.NoError(err)
req.NotNil(registry)
defer registry.Close()
err = preloadSchema(registry)
req.NoError(err)
tests := []struct {
name string
testFunc func(context.Context, Registry) error
validationFunc func(*mockedEventHandler) bool
}{
{
name: "modify indexRule",
testFunc: func(ctx context.Context, r Registry) error {
ir, err := r.GetIndexRule(ctx, &commonv1.Metadata{
Name: "db.instance",
Group: "default",
})
if err != nil {
return err
}
ir.Type = databasev1.IndexRule_TYPE_TREE
return r.UpdateIndexRule(ctx, ir)
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 1) &&
mocked.AssertNumberOfCalls(t, "OnDelete", 0)
},
},
{
name: "modify indexRule without modification",
testFunc: func(ctx context.Context, r Registry) error {
ir, err := r.GetIndexRule(ctx, &commonv1.Metadata{
Name: "db.instance",
Group: "default",
})
if err != nil {
return err
}
return r.UpdateIndexRule(ctx, ir)
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 0) &&
mocked.AssertNumberOfCalls(t, "OnDelete", 0)
},
},
{
name: "delete indexRule",
testFunc: func(ctx context.Context, r Registry) error {
deleted, err := r.DeleteIndexRule(ctx, &commonv1.Metadata{
Name: "db.instance",
Group: "default",
})
if !deleted {
return errors.New("fail to delete object")
}
return err
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 0) &&
mocked.AssertNumberOfCalls(t, "OnDelete", 1)
},
},
{
name: "update indexRuleBinding",
testFunc: func(ctx context.Context, r Registry) error {
irb, err := r.GetIndexRuleBinding(ctx, &commonv1.Metadata{
Name: "sw-index-rule-binding",
Group: "default",
})
if err != nil {
return err
}
irb.Rules = []string{"trace_id", "duration"}
return r.UpdateIndexRuleBinding(ctx, irb)
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 1) &&
mocked.AssertNumberOfCalls(t, "OnDelete", 0)
},
},
{
name: "update indexRuleBinding without modification",
testFunc: func(ctx context.Context, r Registry) error {
irb, err := r.GetIndexRuleBinding(ctx, &commonv1.Metadata{
Name: "sw-index-rule-binding",
Group: "default",
})
if err != nil {
return err
}
return r.UpdateIndexRuleBinding(ctx, irb)
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 0) &&
mocked.AssertNumberOfCalls(t, "OnDelete", 0)
},
},
{
name: "delete indexRuleBinding",
testFunc: func(ctx context.Context, r Registry) error {
_, err := r.DeleteIndexRuleBinding(ctx, &commonv1.Metadata{
Name: "sw-index-rule-binding",
Group: "default",
})
if err != nil {
return err
}
return nil
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 0) &&
mocked.AssertNumberOfCalls(t, "OnDelete", 1)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := require.New(t)
mockedObj := new(mockedEventHandler)
mockedObj.On("OnAddOrUpdate", mock.Anything).Return()
mockedObj.On("OnDelete", mock.Anything).Return()
registry.RegisterHandler(KindStream|KindIndexRuleBinding|KindIndexRule, mockedObj)
err := tt.testFunc(context.TODO(), registry)
req.NoError(err)
req.True(tt.validationFunc(mockedObj))
})
}
}