blob: e8bc07eed105bf61603b31b393850c096634dc7b [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package query_test
import (
"context"
"embed"
"encoding/base64"
"encoding/json"
"math"
"strconv"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
pb "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
svcs *services
deferFn func()
streamSchema stream.Stream
sT, eT time.Time
)
// BeforeSuite - Init logger
var _ = BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
})).To(Succeed())
svcs, deferFn = setUpServices()
var err error
streamSchema, err = svcs.stream.Stream(&commonv1.Metadata{
Name: "sw",
Group: "default",
})
Expect(err).ShouldNot(HaveOccurred())
baseTs := setUpStreamQueryData("multiple_shards.json", streamSchema)
sT, eT = baseTs, baseTs.Add(1*time.Hour)
})
var _ = AfterSuite(func() {
deferFn()
})
type services struct {
stream stream.Service
measure measure.Service
pipeline queue.Queue
}
func setUpServices() (*services, func()) {
// Init `Discovery` module
repo, err := discovery.NewServiceRepo(context.Background())
Expect(err).ShouldNot(HaveOccurred())
// Init `Queue` module
pipeline, err := queue.NewQueue(context.TODO(), repo)
Expect(err).ShouldNot(HaveOccurred())
// Init `Metadata` module
metadataService, err := metadata.NewService(context.TODO())
Expect(err).ShouldNot(HaveOccurred())
// Init `Stream` module
streamService, err := stream.NewService(context.TODO(), metadataService, repo, pipeline)
Expect(err).ShouldNot(HaveOccurred())
// Init `Measure` module
measureService, err := measure.NewService(context.TODO(), metadataService, repo, pipeline)
Expect(err).ShouldNot(HaveOccurred())
preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
preloadStreamSvc := &preloadStreamService{metaSvc: metadataService}
var flags []string
metaPath, metaDeferFunc, err := test.NewSpace()
Expect(err).NotTo(HaveOccurred())
flags = append(flags, "--metadata-root-path="+metaPath)
rootPath, deferFunc, err := test.NewSpace()
Expect(err).NotTo(HaveOccurred())
flags = append(flags, "--measure-root-path="+rootPath)
flags = append(flags, "--stream-root-path="+rootPath)
listenClientURL, listenPeerURL, err := test.NewEtcdListenUrls()
Expect(err).NotTo(HaveOccurred())
flags = append(flags, "--etcd-listen-client-url="+listenClientURL, "--etcd-listen-peer-url="+listenPeerURL)
executor, err := query.NewExecutor(context.TODO(), streamService, measureService, metadataService, repo, pipeline)
Expect(err).NotTo(HaveOccurred())
moduleDeferFunc := test.SetUpModules(
flags,
repo,
pipeline,
metadataService,
preloadMeasureSvc,
measureService,
preloadStreamSvc,
streamService,
executor,
)
return &services{
measure: measureService,
stream: streamService,
pipeline: pipeline,
}, func() {
moduleDeferFunc()
metaDeferFunc()
deferFunc()
}
}
//go:embed testdata/*.json
var dataFS embed.FS
func setUpStreamQueryData(dataFile string, stream stream.Stream) (baseTime time.Time) {
var templates []interface{}
baseTime = timestamp.NowMilli()
content, err := dataFS.ReadFile("testdata/" + dataFile)
Expect(err).ShouldNot(HaveOccurred())
Expect(json.Unmarshal(content, &templates)).Should(Succeed())
bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
for i, template := range templates {
rawSearchTagFamily, errMarshal := json.Marshal(template)
Expect(errMarshal).ShouldNot(HaveOccurred())
searchTagFamily := &modelv1.TagFamilyForWrite{}
Expect(protojson.Unmarshal(rawSearchTagFamily, searchTagFamily)).Should(Succeed())
e := &streamv1.ElementValue{
ElementId: strconv.Itoa(i),
Timestamp: timestamppb.New(baseTime.Add(500 * time.Millisecond * time.Duration(i))),
TagFamilies: []*modelv1.TagFamilyForWrite{
{
Tags: []*modelv1.TagValue{
{
Value: &modelv1.TagValue_BinaryData{
BinaryData: bb,
},
},
},
},
},
}
e.TagFamilies = append(e.TagFamilies, searchTagFamily)
Expect(stream.Write(e)).Should(Succeed())
}
return baseTime
}
var _ = Describe("Stream Query", func() {
It("should return nothing when querying given timeRange which is out of the time range of data", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TimeRange(time.Unix(0, 0), time.Unix(0, 1)).
Projection("searchable", "trace_id").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
})
It("should return segments with data binary projection while querying given timeRange", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TimeRange(sT, eT).
Projection("searchable", "trace_id").
Projection("data", "data_binary").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(5))
Expect(msg.Data()).Should(HaveBinary())
})
It("should return all segments with data binary projection when querying max valid time-range", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
// min: 1677-09-21T00:12:43.145224194Z
// max: 2262-04-11T23:47:16.854775806Z
TimeRange(time.Unix(0, math.MinInt64), time.Unix(0, math.MaxInt64)).
Projection("searchable", "trace_id").
Projection("data", "data_binary").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(5))
Expect(msg.Data()).Should(HaveBinary())
})
It("should return all segments sorted by duration DESC", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TimeRange(sT, eT).
OrderBy("duration", modelv1.Sort_SORT_DESC).
Projection("searchable", "trace_id", "duration").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(5))
sortedChecker := func(elements []*streamv1.Element) bool {
return logical.SortedByIndex(elements, 0, 1, modelv1.Sort_SORT_DESC)
}
Expect(sortedChecker(msg.Data().([]*streamv1.Element))).To(BeTrue())
})
It("should return segments without binary when querying a given TraceID", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TagsInTagFamily("searchable", "trace_id", "=", "1").
TimeRange(sT, eT).
Projection("searchable", "trace_id").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(1))
Expect(msg.Data()).Should(NotHaveBinary())
})
It("should return segments with binary when querying a given TraceID", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TagsInTagFamily("searchable", "trace_id", "=", "1").
TimeRange(sT, eT).
Projection("data", "data_binary").
Projection("searchable", "trace_id").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(1))
Expect(msg.Data()).Should(HaveBinary())
})
It("uses numerical index - query duration < 500", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TagsInTagFamily("searchable", "duration", "<", 500).
TimeRange(sT, eT).
Projection("searchable", "trace_id").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(3))
})
It("uses numerical index - query duration <= 500", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TagsInTagFamily("searchable", "duration", "<=", 500).
TimeRange(sT, eT).
Projection("searchable", "trace_id").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(4))
})
It("uses textual index - http.method == GET", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TagsInTagFamily("searchable", "http.method", "=", "GET").
TimeRange(sT, eT).
Projection("searchable", "trace_id").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(3))
Expect(msg.Data()).Should(NotHaveBinary())
})
It("uses textual index - http.method == GET with dataBinary projection", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TagsInTagFamily("searchable", "http.method", "=", "GET").
TimeRange(sT, eT).
Projection("data", "data_binary").
Projection("searchable", "trace_id").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(3))
Expect(msg.Data()).Should(HaveBinary())
})
It("uses mixed index - status_code == 500 AND duration <= 100", func() {
query := pb.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TagsInTagFamily("searchable", "status_code", "=", 500, "duration", "<=", 100).
TimeRange(sT, eT).
Projection("searchable", "trace_id").
Build()
now := time.Now()
m := bus.NewMessage(bus.MessageID(now.UnixNano()), query)
f, err := svcs.pipeline.Publish(data.TopicStreamQuery, m)
Expect(err).ShouldNot(HaveOccurred())
Expect(f).ShouldNot(BeNil())
msg, err := f.Get()
Expect(err).ShouldNot(HaveOccurred())
Expect(msg).ShouldNot(BeNil())
Expect(msg.Data()).Should(HaveLen(1))
})
})