| // Licensed to Apache Software Foundation (ASF) under one or more contributor |
| // license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright |
| // ownership. Apache Software Foundation (ASF) licenses this file to you under |
| // the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package query |
| |
| import ( |
| "context" |
| "os" |
| "path" |
| "testing" |
| "time" |
| |
| googleUUID "github.com/google/uuid" |
| "github.com/stretchr/testify/require" |
| |
| "github.com/apache/skywalking-banyandb/api/common" |
| "github.com/apache/skywalking-banyandb/api/event" |
| v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1" |
| "github.com/apache/skywalking-banyandb/banyand/discovery" |
| "github.com/apache/skywalking-banyandb/banyand/series" |
| "github.com/apache/skywalking-banyandb/banyand/series/trace" |
| "github.com/apache/skywalking-banyandb/banyand/storage" |
| "github.com/apache/skywalking-banyandb/pkg/bus" |
| "github.com/apache/skywalking-banyandb/pkg/logger" |
| "github.com/apache/skywalking-banyandb/pkg/pb" |
| ) |
| |
| var interval = time.Millisecond * 500 |
| |
| type entityValue struct { |
| seriesID string |
| entityID string |
| dataBinary []byte |
| ts time.Time |
| items []interface{} |
| } |
| |
| func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) { |
| // Bootstrap logger system |
| tester.NoError(logger.Init(logger.Logging{ |
| Env: "dev", |
| Level: "warn", |
| })) |
| |
| // Init `Discovery` module |
| repo, err := discovery.NewServiceRepo(context.Background()) |
| tester.NoError(err) |
| tester.NotNil(repo) |
| |
| // Init `Database` module |
| db, err := storage.NewDB(context.TODO(), repo) |
| tester.NoError(err) |
| uuid, err := googleUUID.NewUUID() |
| tester.NoError(err) |
| rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String()) |
| tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath})) |
| |
| // Init `Trace` module |
| traceSvc, err := trace.NewService(context.TODO(), db, repo) |
| tester.NoError(err) |
| |
| // Init `Query` module |
| executor, err := NewExecutor(context.TODO(), repo, nil, traceSvc, traceSvc) |
| tester.NoError(err) |
| |
| // :PreRun: |
| // 1) TraceSeries, |
| // 2) Database |
| err = traceSvc.PreRun() |
| tester.NoError(err) |
| |
| err = db.PreRun() |
| tester.NoError(err) |
| |
| err = executor.PreRun() |
| tester.NoError(err) |
| |
| return repo, traceSvc, func() { |
| db.GracefulStop() |
| _ = os.RemoveAll(rootPath) |
| } |
| } |
| |
| func setupData(tester *require.Assertions, baseTs time.Time, svc series.Service) { |
| metadata := common.Metadata{ |
| Spec: &v1.Metadata{ |
| Name: "sw", |
| Group: "default", |
| }, |
| } |
| |
| entityValues := []entityValue{ |
| { |
| ts: baseTs, |
| seriesID: "webapp_10.0.0.1", |
| entityID: "1", |
| dataBinary: []byte{11}, |
| items: []interface{}{ |
| "trace_id-xxfff.111323", |
| 0, |
| "webapp_id", |
| "10.0.0.1_id", |
| "/home_id", |
| "webapp", |
| "10.0.0.1", |
| "/home", |
| 300, |
| 1622933202000000000, |
| }, |
| }, |
| { |
| ts: baseTs.Add(interval), |
| seriesID: "gateway_10.0.0.2", |
| entityID: "2", |
| dataBinary: []byte{12}, |
| items: []interface{}{ |
| "trace_id-xxfff.111323a", |
| 1, |
| }, |
| }, |
| { |
| ts: baseTs.Add(interval * 2), |
| seriesID: "httpserver_10.0.0.3", |
| entityID: "3", |
| dataBinary: []byte{13}, |
| items: []interface{}{ |
| "trace_id-xxfff.111323", |
| 1, |
| "httpserver_id", |
| "10.0.0.3_id", |
| "/home_id", |
| "httpserver", |
| "10.0.0.3", |
| "/home", |
| 300, |
| 1622933202000000000, |
| "GET", |
| "200", |
| }, |
| }, |
| { |
| ts: baseTs.Add(interval * 3), |
| seriesID: "database_10.0.0.4", |
| entityID: "4", |
| dataBinary: []byte{14}, |
| items: []interface{}{ |
| "trace_id-xxfff.111323", |
| 0, |
| "database_id", |
| "10.0.0.4_id", |
| "/home_id", |
| "database", |
| "10.0.0.4", |
| "/home", |
| 300, |
| 1622933202000000000, |
| nil, |
| nil, |
| "MySQL", |
| "10.1.1.4", |
| }, |
| }, |
| { |
| ts: baseTs.Add(interval * 4), |
| seriesID: "mq_10.0.0.5", |
| entityID: "5", |
| dataBinary: []byte{15}, |
| items: []interface{}{ |
| "trace_id-zzpp.111323", |
| 0, |
| "mq_id", |
| "10.0.0.5_id", |
| "/home_id", |
| "mq", |
| "10.0.0.5", |
| "/home", |
| 300, |
| 1622933202000000000, |
| nil, |
| nil, |
| nil, |
| nil, |
| "test_topic", |
| "10.0.0.5", |
| }, |
| }, |
| { |
| ts: baseTs.Add(interval * 5), |
| seriesID: "database_10.0.0.6", |
| entityID: "6", |
| dataBinary: []byte{16}, |
| items: []interface{}{ |
| "trace_id-zzpp.111323", |
| 1, |
| "database_id", |
| "10.0.0.6_id", |
| "/home_id", |
| "database", |
| "10.0.0.6", |
| "/home", |
| 300, |
| 1622933202000000000, |
| nil, |
| nil, |
| "MySQL", |
| "10.1.1.6", |
| }, |
| }, |
| { |
| ts: baseTs.Add(interval * 6), |
| seriesID: "mq_10.0.0.7", |
| entityID: "7", |
| dataBinary: []byte{17}, |
| items: []interface{}{ |
| "trace_id-zzpp.111323", |
| 0, |
| "nq_id", |
| "10.0.0.7_id", |
| "/home_id", |
| "mq", |
| "10.0.0.7", |
| "/home", |
| 300, |
| 1622933202000000000, |
| nil, |
| nil, |
| nil, |
| nil, |
| "test_topic", |
| "10.0.0.7", |
| }, |
| }, |
| } |
| |
| for _, ev := range entityValues { |
| ok, err := svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, ev.dataBinary, ev.items...) |
| tester.True(ok) |
| tester.NoError(err) |
| } |
| } |
| |
| func TestQueryProcessor(t *testing.T) { |
| tester := require.New(t) |
| |
| // setup services |
| repo, traceSvc, gracefulStop := setupServices(tester) |
| defer gracefulStop() |
| |
| baseTs := time.Now() |
| setupData(tester, baseTs, traceSvc) |
| |
| tests := []struct { |
| // name of the test case |
| name string |
| // queryGenerator is used to generate a Query |
| queryGenerator func(baseTs time.Time) *v1.QueryRequest |
| // wantLen is the length of entities expected to return |
| wantLen int |
| }{ |
| { |
| name: "query given timeRange is out of the time range of data", |
| queryGenerator: func(baseTs time.Time) *v1.QueryRequest { |
| return pb.NewQueryRequestBuilder(). |
| Limit(10). |
| Offset(0). |
| Metadata("default", "sw"). |
| TimeRange(time.Unix(0, 0), time.Unix(0, 1)). |
| Projection("trace_id"). |
| Build() |
| }, |
| wantLen: 0, |
| }, |
| { |
| name: "query TraceID given timeRange includes the time range of data", |
| queryGenerator: func(baseTs time.Time) *v1.QueryRequest { |
| return pb.NewQueryRequestBuilder(). |
| Limit(10). |
| Offset(0). |
| Metadata("default", "sw"). |
| Fields("trace_id", "=", "trace_id-zzpp.111323"). |
| TimeRange(baseTs.Add(-1*time.Minute), baseTs.Add(1*time.Minute)). |
| Projection("trace_id"). |
| Build() |
| }, |
| wantLen: 3, |
| }, |
| { |
| name: "query TraceID given timeRange includes the time range of data but limit to 1", |
| queryGenerator: func(baseTs time.Time) *v1.QueryRequest { |
| return pb.NewQueryRequestBuilder(). |
| Limit(1). |
| Offset(0). |
| Metadata("default", "sw"). |
| Fields("trace_id", "=", "trace_id-zzpp.111323"). |
| TimeRange(baseTs.Add(-1*time.Minute), baseTs.Add(1*time.Minute)). |
| Projection("trace_id"). |
| Build() |
| }, |
| wantLen: 1, |
| }, |
| } |
| |
| for _, tt := range tests { |
| t.Run(tt.name, func(t *testing.T) { |
| singleTester := require.New(t) |
| now := time.Now() |
| m := bus.NewMessage(bus.MessageID(now.UnixNano()), tt.queryGenerator(baseTs)) |
| f, err := repo.Publish(event.TopicQueryEvent, m) |
| singleTester.NoError(err) |
| singleTester.NotNil(f) |
| msg, err := f.Get() |
| singleTester.NoError(err) |
| singleTester.NotNil(msg) |
| // TODO: better error response |
| singleTester.NotNil(msg.Data()) |
| singleTester.Len(msg.Data(), tt.wantLen) |
| }) |
| } |
| } |