blob: 9735e4fa0572d33e82afce5a6a41de9294b4530d [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package stream
import (
"bytes"
"embed"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"sort"
"strconv"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type shardStruct struct {
id common.ShardID
location []string
elements []string
}
type shardsForTest []shardStruct
var _ = Describe("Write", func() {
Context("Select shard", func() {
var (
s *stream
deferFn func()
)
BeforeEach(func() {
var svcs *services
svcs, deferFn = setUp()
var ok bool
s, ok = svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
Name: "sw",
Group: "default",
})
Expect(ok).To(BeTrue())
})
AfterEach(func() {
deferFn()
})
tests := []struct {
name string
entity tsdb.Entity
wantShardNum int
wantErr bool
}{
{
name: "all shards",
wantShardNum: 2,
},
{
name: "select a shard",
entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
wantShardNum: 1,
},
{
name: "select shards",
entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.AnyEntry, convert.Int64ToBytes(0)},
wantShardNum: 2,
},
}
for _, tt := range tests {
It(tt.name, func() {
shards, err := s.Shards(tt.entity)
if tt.wantErr {
Expect(err).Should(HaveOccurred())
return
}
Expect(err).ShouldNot(HaveOccurred())
Expect(len(shards)).To(Equal(tt.wantShardNum))
})
}
})
Context("Querying by local indices", Ordered, func() {
var (
s *stream
now time.Time
deferFn func()
)
BeforeAll(func() {
var svcs *services
svcs, deferFn = setUp()
var ok bool
s, ok = svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
Name: "sw",
Group: "default",
})
Expect(ok).To(BeTrue())
now = setupQueryData("multiple_shards.json", s)
})
AfterAll(func() {
deferFn()
})
When("", func() {
l1 := []string{fmt.Sprintf("series_%d", tsdb.SeriesID(tsdb.Entity{
tsdb.Entry("sw"),
tsdb.Entry("webapp_id"),
tsdb.Entry("10.0.0.5_id"),
tsdb.Entry(convert.Int64ToBytes(0)),
})), "data_flow_0"}
l2 := []string{fmt.Sprintf("series_%d", tsdb.SeriesID(tsdb.Entity{
tsdb.Entry("sw"),
tsdb.Entry("webapp_id"),
tsdb.Entry("10.0.0.1_id"),
tsdb.Entry(convert.Int64ToBytes(1)),
})), "data_flow_0"}
l3 := []string{fmt.Sprintf("series_%d", tsdb.SeriesID(tsdb.Entity{
tsdb.Entry("sw"),
tsdb.Entry("webapp_id"),
tsdb.Entry("10.0.0.3_id"),
tsdb.Entry(convert.Int64ToBytes(1)),
})), "data_flow_0"}
l4 := []string{fmt.Sprintf("series_%d", tsdb.SeriesID(tsdb.Entity{
tsdb.Entry("sw"),
tsdb.Entry("webapp_id"),
tsdb.Entry("10.0.0.1_id"),
tsdb.Entry(convert.Int64ToBytes(0)),
})), "data_flow_0"}
DescribeTable("", func(args queryOpts, want shardsForTest, wantErr bool) {
got, err := queryData(s, now, args)
if wantErr {
Expect(err).Should(HaveOccurred())
return
}
Expect(err).ShouldNot(HaveOccurred())
sort.SliceStable(got, func(i, j int) bool {
a := got[i]
b := got[j]
if a.id > b.id {
return false
}
for i, al := range a.location {
bl := b.location[i]
if bytes.Compare([]byte(al), []byte(bl)) > 0 {
return false
}
}
return true
})
Expect(got).To(Equal(want))
},
Entry(
"all",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
},
shardsForTest{
{
id: 0,
location: l1,
elements: []string{"4"},
},
{
id: 1,
location: l2,
elements: []string{"1"},
},
{
id: 1,
location: l3,
elements: []string{"2"},
},
{
id: 1,
location: l4,
elements: []string{"3", "5"},
},
},
false,
),
Entry(
"time range",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
offset: 1500 * time.Millisecond,
duration: 1 * time.Hour,
},
shardsForTest{
{
id: 0,
location: l1,
elements: []string{"4"},
},
{
id: 1,
location: l2,
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"5"},
},
},
false,
),
Entry(
"find series by service_id and instance_id",
queryOpts{
entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
duration: 1 * time.Hour,
},
shardsForTest{
{
id: 1,
location: l2,
elements: []string{"1"},
},
{
id: 1,
location: l4,
elements: []string{"3", "5"},
},
},
false,
),
Entry(
"find a series",
queryOpts{
entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
duration: 1 * time.Hour,
},
shardsForTest{
{
id: 1,
location: l4,
elements: []string{"3", "5"},
},
},
false,
),
Entry(
"filter",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
builder.Filter(&databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "endpoint_id",
Group: "default",
Id: 4,
},
Tags: []string{"endpoint_id"},
Type: databasev1.IndexRule_TYPE_INVERTED,
Location: databasev1.IndexRule_LOCATION_SERIES,
}, tsdb.Condition{
"endpoint_id": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_EQ,
Values: [][]byte{[]byte("/home_id")},
},
},
})
},
},
shardsForTest{
{
id: 0,
location: l1,
},
{
id: 1,
location: l2,
elements: []string{"1"},
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"3"},
},
},
false,
),
Entry(
"filter by status_code",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "status_code",
Group: "default",
Id: 5,
},
Tags: []string{"status_code"},
Type: databasev1.IndexRule_TYPE_INVERTED,
Location: databasev1.IndexRule_LOCATION_SERIES,
}
builder.Filter(rule, tsdb.Condition{
"status_code": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_EQ,
Values: [][]byte{convert.Int64ToBytes(500)},
},
},
})
},
},
shardsForTest{
{
id: 0,
location: l1,
},
{
id: 1,
location: l2,
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"3", "5"},
},
},
false,
),
Entry(
"order by duration",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
builder.OrderByIndex(&databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "duration",
Group: "default",
Id: 3,
},
Tags: []string{"duration"},
Type: databasev1.IndexRule_TYPE_TREE,
Location: databasev1.IndexRule_LOCATION_SERIES,
}, modelv1.Sort_SORT_ASC)
},
},
shardsForTest{
{
id: 0,
location: l1,
elements: []string{"4"},
},
{
id: 1,
location: l2,
elements: []string{"1"},
},
{
id: 1,
location: l3,
elements: []string{"2"},
},
{
id: 1,
location: l4,
elements: []string{"3", "5"},
},
},
false,
),
Entry(
"filter by duration",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "duration",
Group: "default",
Id: 3,
},
Tags: []string{"duration"},
Type: databasev1.IndexRule_TYPE_TREE,
Location: databasev1.IndexRule_LOCATION_SERIES,
}
builder.Filter(rule, tsdb.Condition{
"duration": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_LT,
Values: [][]byte{convert.Int64ToBytes(500)},
},
},
})
},
},
shardsForTest{
{
id: 0,
location: l1,
elements: []string{"4"},
},
{
id: 1,
location: l2,
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"3", "5"},
},
},
false,
),
Entry(
"filter and sort by duration",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "duration",
Group: "default",
Id: 3,
},
Tags: []string{"duration"},
Type: databasev1.IndexRule_TYPE_TREE,
Location: databasev1.IndexRule_LOCATION_SERIES,
}
builder.Filter(rule, tsdb.Condition{
"duration": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_LT,
Values: [][]byte{convert.Int64ToBytes(500)},
},
},
})
builder.OrderByIndex(rule, modelv1.Sort_SORT_ASC)
},
},
shardsForTest{
{
id: 0,
location: l1,
elements: []string{"4"},
},
{
id: 1,
location: l2,
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"3", "5"},
},
},
false,
),
Entry(
"filter by several conditions",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "duration",
Group: "default",
Id: 3,
},
Tags: []string{"duration"},
Type: databasev1.IndexRule_TYPE_TREE,
Location: databasev1.IndexRule_LOCATION_SERIES,
}
builder.Filter(rule, tsdb.Condition{
"duration": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_LT,
Values: [][]byte{convert.Int64ToBytes(500)},
},
},
})
builder.Filter(&databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "endpoint_id",
Group: "default",
Id: 4,
},
Tags: []string{"endpoint_id"},
Type: databasev1.IndexRule_TYPE_INVERTED,
Location: databasev1.IndexRule_LOCATION_SERIES,
}, tsdb.Condition{
"endpoint_id": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_EQ,
Values: [][]byte{[]byte("/home_id")},
},
},
})
},
},
shardsForTest{
{
id: 0,
location: l1,
},
{
id: 1,
location: l2,
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"3"},
},
},
false,
),
Entry(
"filter by several conditions, sort by duration",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "duration",
Group: "default",
Id: 3,
},
Tags: []string{"duration"},
Type: databasev1.IndexRule_TYPE_TREE,
Location: databasev1.IndexRule_LOCATION_SERIES,
}
builder.Filter(rule, tsdb.Condition{
"duration": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_LT,
Values: [][]byte{convert.Int64ToBytes(500)},
},
},
})
builder.OrderByIndex(rule, modelv1.Sort_SORT_ASC)
builder.Filter(&databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "endpoint_id",
Group: "default",
Id: 4,
},
Tags: []string{"endpoint_id"},
Type: databasev1.IndexRule_TYPE_INVERTED,
Location: databasev1.IndexRule_LOCATION_SERIES,
}, tsdb.Condition{
"endpoint_id": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_EQ,
Values: [][]byte{[]byte("/home_id")},
},
},
})
},
},
shardsForTest{
{
id: 0,
location: l1,
},
{
id: 1,
location: l2,
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"3"},
},
},
false,
),
Entry(
"filter by extended tags c",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "extended_tags",
Group: "default",
Id: 11,
},
Tags: []string{"extended_tags"},
Type: databasev1.IndexRule_TYPE_INVERTED,
Location: databasev1.IndexRule_LOCATION_SERIES,
}
builder.Filter(rule, tsdb.Condition{
"extended_tags": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_HAVING,
Values: [][]byte{[]byte("c")},
},
},
})
},
},
shardsForTest{
{
id: 0,
location: l1,
elements: []string{"4"},
},
{
id: 1,
location: l2,
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"3", "5"},
},
},
false,
),
Entry(
"filter by extended tags abc",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "extended_tags",
Group: "default",
Id: 11,
},
Tags: []string{"extended_tags"},
Type: databasev1.IndexRule_TYPE_INVERTED,
Location: databasev1.IndexRule_LOCATION_SERIES,
}
builder.Filter(rule, tsdb.Condition{
"extended_tags": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_HAVING,
Values: [][]byte{[]byte("c"), []byte("a"), []byte("b")},
},
},
})
},
},
shardsForTest{
{
id: 0,
location: l1,
},
{
id: 1,
location: l2,
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"5"},
},
},
false,
),
Entry(
"filter by extended tags bc",
queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
duration: 1 * time.Hour,
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Name: "extended_tags",
Group: "default",
Id: 11,
},
Tags: []string{"extended_tags"},
Type: databasev1.IndexRule_TYPE_INVERTED,
Location: databasev1.IndexRule_LOCATION_SERIES,
}
builder.Filter(rule, tsdb.Condition{
"extended_tags": []index.ConditionValue{
{
Op: modelv1.Condition_BINARY_OP_HAVING,
Values: [][]byte{[]byte("c"), []byte("b")},
},
},
})
},
},
shardsForTest{
{
id: 0,
location: l1,
elements: []string{"4"},
},
{
id: 1,
location: l2,
},
{
id: 1,
location: l3,
},
{
id: 1,
location: l4,
elements: []string{"5"},
},
},
false,
),
)
})
})
Context("Querying by global indices", Ordered, func() {
var (
s *stream
deferFn func()
)
BeforeAll(func() {
var svcs *services
svcs, deferFn = setUp()
var ok bool
s, ok = svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
Name: "sw",
Group: "default",
})
Expect(ok).To(BeTrue())
_ = setupQueryData("global_index.json", s)
})
AfterAll(func() {
deferFn()
})
DescribeTable("", func(traceID string, wantTraceSegmentNum int, wantErr bool) {
shards, errShards := s.Shards(nil)
Expect(errShards).ShouldNot(HaveOccurred())
err := func() error {
itemSize := 0
for _, shard := range shards {
itemIDs, err := shard.Index().Seek(index.Field{
Key: index.FieldKey{
SeriesID: tsdb.GlobalSeriesID(tsdb.Entry(s.name)),
// trace_id
IndexRuleID: 10,
},
Term: []byte(traceID),
})
if err != nil {
return errors.WithStack(err)
}
if len(itemIDs) < 1 {
continue
}
if err != nil {
return errors.WithStack(err)
}
itemSize += len(itemIDs)
for _, itemID := range itemIDs {
segShard, err := s.Shard(itemID.ShardID)
if err != nil {
return errors.WithStack(err)
}
series, err := segShard.Series().GetByID(itemID.SeriesID)
if err != nil {
return errors.WithStack(err)
}
err = func() error {
item, closer, errInner := series.Get(itemID)
defer func(closer io.Closer) {
_ = closer.Close()
}(closer)
if errInner != nil {
return errors.WithStack(errInner)
}
tagFamily, errInner := s.ParseTagFamily("searchable", item)
if errInner != nil {
return errors.WithStack(errInner)
}
for _, tag := range tagFamily.GetTags() {
if tag.GetKey() == "trace_id" {
Expect(tag.GetValue().GetStr().GetValue()).To(Equal(traceID))
}
}
return nil
}()
if err != nil {
return errors.WithStack(err)
}
}
}
Expect(itemSize).To(Equal(wantTraceSegmentNum))
return nil
}()
if wantErr {
Expect(err).Should(HaveOccurred())
return
}
Expect(err).ShouldNot(HaveOccurred())
},
Entry(
"trace id is 1",
"1",
2,
false,
),
Entry(
"trace id is 2",
"2",
3,
false,
),
Entry(
"unknown trace id",
"foo",
0,
false,
),
)
})
})
type queryOpts struct {
entity tsdb.Entity
offset time.Duration
duration time.Duration
buildFn func(builder tsdb.SeekerBuilder)
}
func queryData(s *stream, baseTime time.Time, opts queryOpts) (shardsForTest, error) {
shards, err := s.Shards(opts.entity)
Expect(err).ShouldNot(HaveOccurred())
got := shardsForTest{}
for _, shard := range shards {
seriesList, err := shard.Series().List(tsdb.NewPath(opts.entity))
if err != nil {
return nil, err
}
for _, series := range seriesList {
got, err = func(g shardsForTest) (shardsForTest, error) {
sp, errInner := series.Span(timestamp.NewInclusiveTimeRangeDuration(baseTime.Add(opts.offset), opts.duration))
defer func(sp tsdb.SeriesSpan) {
if sp != nil {
_ = sp.Close()
}
}(sp)
if errInner != nil {
return nil, errInner
}
builder := sp.SeekerBuilder()
if opts.buildFn != nil {
opts.buildFn(builder)
}
seeker, errInner := builder.Build()
if errInner != nil {
return nil, errInner
}
iter, errInner := seeker.Seek()
if errInner != nil {
return nil, errInner
}
for dataFlowID, iterator := range iter {
var elements []string
for iterator.Next() {
tagFamily, errInner := s.ParseTagFamily("searchable", iterator.Val())
if errInner != nil {
return nil, errInner
}
for _, tag := range tagFamily.GetTags() {
if tag.GetKey() == "trace_id" {
elements = append(elements, tag.GetValue().GetStr().GetValue())
}
}
eleID, errInner := s.ParseElementID(iterator.Val())
if errInner != nil {
return nil, errInner
}
Expect(eleID).ShouldNot(BeEmpty())
}
_ = iterator.Close()
g = append(g, shardStruct{
id: shard.ID(),
location: []string{
fmt.Sprintf("series_%v", series.ID()),
"data_flow_" + strconv.Itoa(dataFlowID),
},
elements: elements,
})
}
return g, nil
}(got)
if err != nil {
return nil, err
}
}
}
return got, nil
}
//go:embed testdata/*.json
var dataFS embed.FS
func setupQueryData(dataFile string, stream *stream) (baseTime time.Time) {
var templates []interface{}
baseTime = timestamp.NowMilli()
content, err := dataFS.ReadFile("testdata/" + dataFile)
Expect(err).ShouldNot(HaveOccurred())
Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
for i, template := range templates {
rawSearchTagFamily, errMarshal := json.Marshal(template)
Expect(errMarshal).ShouldNot(HaveOccurred())
searchTagFamily := &modelv1.TagFamilyForWrite{}
Expect(protojson.Unmarshal(rawSearchTagFamily, searchTagFamily)).ShouldNot(HaveOccurred())
e := &streamv1.ElementValue{
ElementId: strconv.Itoa(i),
Timestamp: timestamppb.New(baseTime.Add(500 * time.Millisecond * time.Duration(i))),
TagFamilies: []*modelv1.TagFamilyForWrite{
{
Tags: []*modelv1.TagValue{
{
Value: &modelv1.TagValue_BinaryData{
BinaryData: bb,
},
},
},
},
},
}
e.TagFamilies = append(e.TagFamilies, searchTagFamily)
errInner := stream.Write(e)
Expect(errInner).ShouldNot(HaveOccurred())
}
return baseTime
}