blob: a3c5295686a310dae9d7e7258235e028b8346c0d [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure_test
import (
"context"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/apache/skywalking-banyandb/api/event"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/pkg/test"
)
var _ = Describe("Metadata", func() {
var svcs *services
var deferFn func()
BeforeEach(func() {
svcs, deferFn = setUp()
})
AfterEach(func() {
deferFn()
})
Context("Manage group", func() {
It("should pass smoke test", func() {
Eventually(func() bool {
_, ok := svcs.measure.LoadGroup("sw_metric")
return ok
}).WithTimeout(10 * time.Second).Should(BeTrue())
})
It("should close the group", func() {
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
deleted, err := svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "sw_metric")
Expect(err).ShouldNot(HaveOccurred())
Expect(deleted).Should(BeTrue())
Eventually(func() bool {
_, ok := svcs.measure.LoadGroup("sw_metric")
return ok
}).WithTimeout(10 * time.Second).Should(BeFalse())
})
It("should add shards", func() {
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4)
groupSchema, err := svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "sw_metric")
Expect(err).ShouldNot(HaveOccurred())
Expect(groupSchema).ShouldNot(BeNil())
groupSchema.ResourceOpts.ShardNum = 4
Expect(svcs.metadataService.GroupRegistry().UpdateGroup(context.TODO(), groupSchema)).Should(Succeed())
Eventually(func() bool {
group, ok := svcs.measure.LoadGroup("sw_metric")
if !ok {
return false
}
return group.GetSchema().GetResourceOpts().GetShardNum() == 4
}).WithTimeout(10 * time.Second).Should(BeTrue())
})
})
Context("Manage measure", func() {
It("should pass smoke test", func() {
Eventually(func() bool {
_, err := svcs.measure.Measure(&commonv1.Metadata{
Name: "service_cpm_minute",
Group: "sw_metric",
})
return err == nil
}).WithTimeout(10 * time.Second).Should(BeTrue())
})
It("should close the measure", func() {
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1)
deleted, err := svcs.metadataService.MeasureRegistry().DeleteMeasure(context.TODO(), &commonv1.Metadata{
Name: "service_cpm_minute",
Group: "sw_metric",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(deleted).Should(BeTrue())
Eventually(func() error {
_, err := svcs.measure.Measure(&commonv1.Metadata{
Name: "service_cpm_minute",
Group: "sw_metric",
})
return err
}).WithTimeout(30 * time.Second).Should(MatchError(measure.ErrMeasureNotExist))
})
Context("Update a measure", func() {
var measureSchema *databasev1.Measure
BeforeEach(func() {
var err error
measureSchema, err = svcs.metadataService.MeasureRegistry().GetMeasure(context.TODO(), &commonv1.Metadata{
Name: "service_cpm_minute",
Group: "sw_metric",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(measureSchema).ShouldNot(BeNil())
})
It("should update a new measure", func() {
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1)
// Remove the first tag from the entity
measureSchema.Entity.TagNames = measureSchema.Entity.TagNames[1:]
entitySize := len(measureSchema.Entity.TagNames)
Expect(svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(), measureSchema)).Should(Succeed())
Eventually(func() bool {
val, err := svcs.measure.Measure(&commonv1.Metadata{
Name: "service_cpm_minute",
Group: "sw_metric",
})
if err != nil {
return false
}
return len(val.GetSchema().GetEntity().TagNames) == entitySize
}).WithTimeout(10 * time.Second).Should(BeTrue())
})
})
})
})