Process query from liaison (#18)
* add processor
* use custom analyzer
* try add testcase
* fix license
* fix test
* remove fmt.Sprintf usage in the read/write path
* fix processor
* collect chunks from all shards
* fix ci
* finish indexScan with shards
* change IndexSearch API
* remove todo
* refactor mock
* add Write API
* refactor traceIDFetch
* add log in queryProcessor
* stop loop if chunkSet is empty
* get field names from schema
* add TODO
* generate random dir for each test
* include os.RemoveAll in return func
* fix typo
diff --git a/Makefile b/Makefile
index 73e7946..1314666 100644
--- a/Makefile
+++ b/Makefile
@@ -44,6 +44,7 @@
##@ Test targets
test: TARGET=test
+test: PROJECTS:=$(PROJECTS) pkg
test: default ## Run the unit tests in all projects
test-race: TARGET=test-race
diff --git a/api/event/query.go b/api/event/query.go
new file mode 100644
index 0000000..be842a9
--- /dev/null
+++ b/api/event/query.go
@@ -0,0 +1,33 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package event
+
+import (
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+var (
+ QueryEventKindVersion = common.KindVersion{
+ Version: "v1",
+ Kind: "event-query",
+ }
+ // TopicQueryEvent is a bidirectional topic for request/response communication
+ // between Liaison and Query module
+ TopicQueryEvent = bus.BiTopic(QueryEventKindVersion.String())
+)
diff --git a/banyand/index/index.go b/banyand/index/index.go
index a21bd85..6ab47f2 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-//go:generate mockgen -destination=./index_mock.go -package=index . Repo
package index
import (
@@ -35,8 +34,9 @@
Op apiv1.PairQuery_BinaryOp
}
+//go:generate mockgen -destination=./index_mock.go -package=index . Repo
type Repo interface {
- Search(series common.Metadata, shardID uint, startTime, endTime uint64, conditions []Condition) (posting.List, error)
+ Search(seriesMeta common.Metadata, shardID uint, startTime, endTime uint64, indexObjectName string, conditions []Condition) (posting.List, error)
}
type Builder interface {
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 2d50601..b024cbe 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -65,7 +65,7 @@
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate trace series")
}
- q, err := query.NewExecutor(ctx, idx, traceSeries)
+ q, err := query.NewExecutor(ctx, repo, idx, traceSeries, traceSeries)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate query executor")
}
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
new file mode 100644
index 0000000..2bbb121
--- /dev/null
+++ b/banyand/query/processor.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/event"
+ v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ apischema "github.com/apache/skywalking-banyandb/api/schema"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/index"
+ "github.com/apache/skywalking-banyandb/banyand/series"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/query/executor"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+const (
+ moduleName = "query-processor"
+)
+
+var (
+ _ Executor = (*queryProcessor)(nil)
+ _ bus.MessageListener = (*queryProcessor)(nil)
+ _ executor.ExecutionContext = (*queryProcessor)(nil)
+)
+
+type queryProcessor struct {
+ index.Repo
+ series.UniModel
+ logger *logger.Logger
+ schemaRepo series.SchemaRepo
+ log *logger.Logger
+ serviceRepo discovery.ServiceRepo
+}
+
+func (q *queryProcessor) Rev(message bus.Message) (resp bus.Message) {
+ queryCriteria, ok := message.Data().(*v1.QueryRequest)
+ if !ok {
+ q.log.Warn().Msg("invalid event data type")
+ return
+ }
+ q.log.Info().
+ Msg("received a query event")
+ analyzer := logical.DefaultAnalyzer()
+ metadata := &common.Metadata{
+ KindVersion: apischema.SeriesKindVersion,
+ Spec: queryCriteria.GetMetadata(),
+ }
+ s, err := analyzer.BuildTraceSchema(context.TODO(), *metadata)
+ if err != nil {
+ q.logger.Error().Err(err).Msg("fail to build trace schema")
+ return
+ }
+
+ p, err := analyzer.Analyze(context.TODO(), queryCriteria, metadata, s)
+ if err != nil {
+ q.logger.Error().Err(err).Msg("fail to analyze the query request")
+ return
+ }
+
+ entities, err := p.Execute(q)
+ if err != nil {
+ q.logger.Error().Err(err).Msg("fail to execute the query plan")
+ return
+ }
+
+ now := time.Now().UnixNano()
+ resp = bus.NewMessage(bus.MessageID(now), entities)
+
+ return
+}
+
+func (q *queryProcessor) Name() string {
+ return moduleName
+}
+
+func (q *queryProcessor) PreRun() error {
+ q.log = logger.GetLogger(moduleName)
+ return q.serviceRepo.Subscribe(event.TopicQueryEvent, q)
+}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
new file mode 100644
index 0000000..7ebe12f
--- /dev/null
+++ b/banyand/query/processor_test.go
@@ -0,0 +1,333 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+ "context"
+ "os"
+ "path"
+ "testing"
+ "time"
+
+ googleUUID "github.com/google/uuid"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/event"
+ v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/series"
+ "github.com/apache/skywalking-banyandb/banyand/series/trace"
+ "github.com/apache/skywalking-banyandb/banyand/storage"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+var interval = time.Millisecond * 500
+
+type entityValue struct {
+ seriesID string
+ entityID string
+ dataBinary []byte
+ ts time.Time
+ items []interface{}
+}
+
+func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) {
+ // Bootstrap logger system
+ tester.NoError(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "warn",
+ }))
+
+ // Init `Discovery` module
+ repo, err := discovery.NewServiceRepo(context.Background())
+ tester.NoError(err)
+ tester.NotNil(repo)
+
+ // Init `Database` module
+ db, err := storage.NewDB(context.TODO(), repo)
+ tester.NoError(err)
+ uuid, err := googleUUID.NewUUID()
+ tester.NoError(err)
+ rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
+ tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
+
+ // Init `Trace` module
+ traceSvc, err := trace.NewService(context.TODO(), db, repo)
+ tester.NoError(err)
+
+ // Init `Query` module
+ executor, err := NewExecutor(context.TODO(), repo, nil, traceSvc, traceSvc)
+ tester.NoError(err)
+
+ // :PreRun:
+ // 1) TraceSeries,
+ // 2) Database
+ err = traceSvc.PreRun()
+ tester.NoError(err)
+
+ err = db.PreRun()
+ tester.NoError(err)
+
+ err = executor.PreRun()
+ tester.NoError(err)
+
+ return repo, traceSvc, func() {
+ db.GracefulStop()
+ _ = os.RemoveAll(rootPath)
+ }
+}
+
+func setupData(tester *require.Assertions, baseTs time.Time, svc series.Service) {
+ metadata := common.Metadata{
+ Spec: &v1.Metadata{
+ Name: "sw",
+ Group: "default",
+ },
+ }
+
+ entityValues := []entityValue{
+ {
+ ts: baseTs,
+ seriesID: "webapp_10.0.0.1",
+ entityID: "1",
+ dataBinary: []byte{11},
+ items: []interface{}{
+ "trace_id-xxfff.111323",
+ 0,
+ "webapp_id",
+ "10.0.0.1_id",
+ "/home_id",
+ "webapp",
+ "10.0.0.1",
+ "/home",
+ 300,
+ 1622933202000000000,
+ },
+ },
+ {
+ ts: baseTs.Add(interval),
+ seriesID: "gateway_10.0.0.2",
+ entityID: "2",
+ dataBinary: []byte{12},
+ items: []interface{}{
+ "trace_id-xxfff.111323a",
+ 1,
+ },
+ },
+ {
+ ts: baseTs.Add(interval * 2),
+ seriesID: "httpserver_10.0.0.3",
+ entityID: "3",
+ dataBinary: []byte{13},
+ items: []interface{}{
+ "trace_id-xxfff.111323",
+ 1,
+ "httpserver_id",
+ "10.0.0.3_id",
+ "/home_id",
+ "httpserver",
+ "10.0.0.3",
+ "/home",
+ 300,
+ 1622933202000000000,
+ "GET",
+ "200",
+ },
+ },
+ {
+ ts: baseTs.Add(interval * 3),
+ seriesID: "database_10.0.0.4",
+ entityID: "4",
+ dataBinary: []byte{14},
+ items: []interface{}{
+ "trace_id-xxfff.111323",
+ 0,
+ "database_id",
+ "10.0.0.4_id",
+ "/home_id",
+ "database",
+ "10.0.0.4",
+ "/home",
+ 300,
+ 1622933202000000000,
+ nil,
+ nil,
+ "MySQL",
+ "10.1.1.4",
+ },
+ },
+ {
+ ts: baseTs.Add(interval * 4),
+ seriesID: "mq_10.0.0.5",
+ entityID: "5",
+ dataBinary: []byte{15},
+ items: []interface{}{
+ "trace_id-zzpp.111323",
+ 0,
+ "mq_id",
+ "10.0.0.5_id",
+ "/home_id",
+ "mq",
+ "10.0.0.5",
+ "/home",
+ 300,
+ 1622933202000000000,
+ nil,
+ nil,
+ nil,
+ nil,
+ "test_topic",
+ "10.0.0.5",
+ },
+ },
+ {
+ ts: baseTs.Add(interval * 5),
+ seriesID: "database_10.0.0.6",
+ entityID: "6",
+ dataBinary: []byte{16},
+ items: []interface{}{
+ "trace_id-zzpp.111323",
+ 1,
+ "database_id",
+ "10.0.0.6_id",
+ "/home_id",
+ "database",
+ "10.0.0.6",
+ "/home",
+ 300,
+ 1622933202000000000,
+ nil,
+ nil,
+ "MySQL",
+ "10.1.1.6",
+ },
+ },
+ {
+ ts: baseTs.Add(interval * 6),
+ seriesID: "mq_10.0.0.7",
+ entityID: "7",
+ dataBinary: []byte{17},
+ items: []interface{}{
+ "trace_id-zzpp.111323",
+ 0,
+ "nq_id",
+ "10.0.0.7_id",
+ "/home_id",
+ "mq",
+ "10.0.0.7",
+ "/home",
+ 300,
+ 1622933202000000000,
+ nil,
+ nil,
+ nil,
+ nil,
+ "test_topic",
+ "10.0.0.7",
+ },
+ },
+ }
+
+ for _, ev := range entityValues {
+ ok, err := svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, ev.dataBinary, ev.items...)
+ tester.True(ok)
+ tester.NoError(err)
+ }
+}
+
+func TestQueryProcessor(t *testing.T) {
+ tester := require.New(t)
+
+ // setup services
+ repo, traceSvc, gracefulStop := setupServices(tester)
+ defer gracefulStop()
+
+ baseTs := time.Now()
+ setupData(tester, baseTs, traceSvc)
+
+ tests := []struct {
+ // name of the test case
+ name string
+ // queryGenerator is used to generate a Query
+ queryGenerator func(baseTs time.Time) *v1.QueryRequest
+ // wantLen is the length of entities expected to return
+ wantLen int
+ }{
+ {
+ name: "query given timeRange is out of the time range of data",
+ queryGenerator: func(baseTs time.Time) *v1.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ TimeRange(time.Unix(0, 0), time.Unix(0, 1)).
+ Projection("trace_id").
+ Build()
+ },
+ wantLen: 0,
+ },
+ {
+ name: "query TraceID given timeRange includes the time range of data",
+ queryGenerator: func(baseTs time.Time) *v1.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ Fields("trace_id", "=", "trace_id-zzpp.111323").
+ TimeRange(baseTs.Add(-1*time.Minute), baseTs.Add(1*time.Minute)).
+ Projection("trace_id").
+ Build()
+ },
+ wantLen: 3,
+ },
+ {
+ name: "query TraceID given timeRange includes the time range of data but limit to 1",
+ queryGenerator: func(baseTs time.Time) *v1.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(1).
+ Offset(0).
+ Metadata("default", "sw").
+ Fields("trace_id", "=", "trace_id-zzpp.111323").
+ TimeRange(baseTs.Add(-1*time.Minute), baseTs.Add(1*time.Minute)).
+ Projection("trace_id").
+ Build()
+ },
+ wantLen: 1,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ singleTester := require.New(t)
+ now := time.Now()
+ m := bus.NewMessage(bus.MessageID(now.UnixNano()), tt.queryGenerator(baseTs))
+ f, err := repo.Publish(event.TopicQueryEvent, m)
+ singleTester.NoError(err)
+ singleTester.NotNil(f)
+ msg, err := f.Get()
+ singleTester.NoError(err)
+ singleTester.NotNil(msg)
+ // TODO: better error response
+ singleTester.NotNil(msg.Data())
+ singleTester.Len(msg.Data(), tt.wantLen)
+ })
+ }
+}
diff --git a/banyand/query/query.go b/banyand/query/query.go
index 56d0b0f..3c3f9a8 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -20,8 +20,10 @@
import (
"context"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/series"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -29,6 +31,12 @@
run.PreRunner
}
-func NewExecutor(ctx context.Context, idx index.Repo, s series.UniModel) (Executor, error) {
- return nil, nil
+func NewExecutor(_ context.Context, serviceRepo discovery.ServiceRepo, indexRepo index.Repo, uniModel series.UniModel, schemaRepo series.SchemaRepo) (Executor, error) {
+ return &queryProcessor{
+ Repo: indexRepo,
+ UniModel: uniModel,
+ schemaRepo: schemaRepo,
+ serviceRepo: serviceRepo,
+ logger: logger.GetLogger("query"),
+ }, nil
}
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index e2cd6af..1e5f861 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -21,15 +21,15 @@
"context"
"github.com/apache/skywalking-banyandb/banyand/discovery"
- bus2 "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
)
type Queue interface {
run.Config
run.PreRunner
- bus2.Subscriber
- bus2.Publisher
+ bus.Subscriber
+ bus.Publisher
}
func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) {
diff --git a/banyand/series/series.go b/banyand/series/series.go
index 9dc8263..f10b60e 100644
--- a/banyand/series/series.go
+++ b/banyand/series/series.go
@@ -20,6 +20,7 @@
import (
"context"
+ "time"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
@@ -54,6 +55,8 @@
FetchEntity(traceSeries common.Metadata, shardID uint, chunkIDs posting2.List, opt ScanOptions) ([]data.Entity, error)
//ScanEntity returns data.Entity between a duration by ScanOptions
ScanEntity(traceSeries common.Metadata, startTime, endTime uint64, opt ScanOptions) ([]data.Entity, error)
+ // Write entity to the given traceSeries
+ Write(traceSeries common.Metadata, ts time.Time, seriesID, entityID string, dataBinary []byte, items ...interface{}) (bool, error)
}
//UniModel combines Trace, Metric and Log repositories into a union interface
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index 507f380..51f1603 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -19,11 +19,14 @@
import (
"context"
+ "os"
+ "path"
"sort"
"strings"
"testing"
"time"
+ googleUUID "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/apache/skywalking-banyandb/api/common"
@@ -56,7 +59,10 @@
_ = logger.Bootstrap()
db, err := storage.NewDB(context.TODO(), nil)
assert.NoError(t, err)
- assert.NoError(t, db.FlagSet().Parse(nil))
+ uuid, err := googleUUID.NewUUID()
+ assert.NoError(t, err)
+ rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
+ assert.NoError(t, db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
svc, err := NewService(context.TODO(), db, nil)
assert.NoError(t, err)
assert.NoError(t, svc.PreRun())
@@ -69,7 +75,10 @@
},
})
assert.NoError(t, err)
- return ts, db.GracefulStop
+ return ts, func() {
+ db.GracefulStop()
+ _ = os.RemoveAll(rootPath)
+ }
}
type seriesEntity struct {
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index fd83660..2551813 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -19,7 +19,6 @@
import (
"context"
- "fmt"
"strconv"
"time"
@@ -35,13 +34,14 @@
"github.com/apache/skywalking-banyandb/banyand/series/schema"
"github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/partition"
"github.com/apache/skywalking-banyandb/pkg/pb"
posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
)
const (
- traceSeriesIDTemp = "%s:%s"
// KV stores
chunkIDMapping = "chunkIDMapping"
startTimeIndex = "startTimeIndex"
@@ -111,7 +111,7 @@
return errTS
}
s.db.Register(ts)
- id := fmt.Sprintf(traceSeriesIDTemp, ts.name, ts.group)
+ id := formatTraceSeriesID(ts.name, ts.group)
s.schemaMap[id] = ts
s.l.Info().Str("id", id).Msg("initialize Trace series")
}
@@ -184,7 +184,7 @@
}
func (s *service) getSeries(traceSeries common.Metadata) (*traceSeries, error) {
- id := getTraceSeriesID(traceSeries)
+ id := formatTraceSeriesID(traceSeries.Spec.GetName(), traceSeries.Spec.GetGroup())
s.l.Debug().Str("id", id).Msg("got Trace series")
ts, ok := s.schemaMap[id]
if !ok {
@@ -193,8 +193,30 @@
return ts, nil
}
-func getTraceSeriesID(traceSeries common.Metadata) string {
- return fmt.Sprintf(traceSeriesIDTemp, traceSeries.Spec.GetName(), traceSeries.Spec.GetGroup())
+func (s *service) Write(traceSeriesMetadata common.Metadata, ts time.Time, seriesID, entityID string, dataBinary []byte, items ...interface{}) (bool, error) {
+ traceSeries, err := s.getSeries(traceSeriesMetadata)
+ if err != nil {
+ return false, err
+ }
+
+ ev := pb.NewEntityValueBuilder().
+ DataBinary(dataBinary).
+ EntityID(entityID).
+ Fields(items...).
+ Timestamp(ts).
+ Build()
+
+ seriesIDBytes := []byte(seriesID)
+ shardID := partition.ShardID(seriesIDBytes, traceSeries.shardNum)
+
+ _, err = traceSeries.Write(common.SeriesID(convert.Hash(seriesIDBytes)), shardID, data.EntityValue{
+ EntityValue: ev,
+ })
+ return err == nil, err
+}
+
+func formatTraceSeriesID(name, group string) string {
+ return name + ":" + group
}
type traceSeries struct {
diff --git a/go.mod b/go.mod
index fe24d89..5ad72a0 100644
--- a/go.mod
+++ b/go.mod
@@ -9,6 +9,7 @@
github.com/golang/mock v1.5.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
+ github.com/google/uuid v1.3.0
github.com/klauspost/compress v1.13.1 // indirect
github.com/oklog/run v1.1.0
github.com/pkg/errors v0.9.1
diff --git a/go.sum b/go.sum
index 42f0ca5..ce8fd5c 100644
--- a/go.sum
+++ b/go.sum
@@ -173,6 +173,8 @@
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
diff --git a/pkg/query/logical/analyzer.go b/pkg/query/logical/analyzer.go
index bb9296f..585a746 100644
--- a/pkg/query/logical/analyzer.go
+++ b/pkg/query/logical/analyzer.go
@@ -67,8 +67,9 @@
}
s := &schema{
- indexRule: indexRule,
- fieldMap: make(map[string]*fieldSpec),
+ traceSeries: traceSeries.Spec,
+ indexRule: indexRule,
+ fieldMap: make(map[string]*fieldSpec),
}
// generate the schema of the fields for the traceSeries
@@ -94,21 +95,23 @@
useIndexScan := false
var fieldExprs []Expr
traceState := series.TraceStateDefault
+
for _, pairQuery := range criteria.GetFields() {
op := pairQuery.GetOp()
typedPair := pairQuery.GetCondition()
switch v := typedPair.GetTyped().(type) {
case *apiv1.TypedPair_StrPair:
// check special field `trace_id`
- if v.StrPair.GetKey() == "trace_id" {
- return TraceIDFetch(v.StrPair.GetValues()[0], traceMetadata, s), nil
+ if v.StrPair.GetKey() == s.TraceIDFieldName() {
+ plan = TraceIDFetch(v.StrPair.GetValues()[0], traceMetadata, projStr...)
+ break
}
useIndexScan = true
lit := parseStrLiteral(v.StrPair)
fieldExprs = append(fieldExprs, binaryOpFactory[op](NewFieldRef(v.StrPair.GetKey()), lit))
case *apiv1.TypedPair_IntPair:
// check special field `state`
- if v.IntPair.GetKey() == "state" {
+ if v.IntPair.GetKey() == s.TraceStateFieldName() {
traceState = series.TraceState(v.IntPair.GetValues()[0])
continue
}
@@ -120,11 +123,14 @@
}
}
- // first check if we can use index-scan
- if useIndexScan {
- plan = IndexScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, fieldExprs, traceState, projStr...)
- } else {
- plan = TableScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, traceState, projStr...)
+ // if plan is already assigned, skip
+ if plan == nil {
+ // first check if we can use index-scan
+ if useIndexScan {
+ plan = IndexScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, fieldExprs, traceState, projStr...)
+ } else {
+ plan = TableScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, traceState, projStr...)
+ }
}
} else {
plan = TableScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, series.TraceStateDefault, projStr...)
diff --git a/pkg/query/logical/analyzer_test.go b/pkg/query/logical/analyzer_test.go
index 485cf84..eabd79c 100644
--- a/pkg/query/logical/analyzer_test.go
+++ b/pkg/query/logical/analyzer_test.go
@@ -139,7 +139,8 @@
assert.NoError(err)
assert.NotNil(plan)
- correctPlan := logical.TraceIDFetch("123", metadata, schema)
+ correctPlan, err := logical.TraceIDFetch("123", metadata).Analyze(schema)
+ assert.NoError(err)
assert.NotNil(correctPlan)
cmp.Equal(plan, correctPlan)
}
diff --git a/pkg/query/logical/common_test.go b/pkg/query/logical/common_test.go
index 236897f..f77bb54 100644
--- a/pkg/query/logical/common_test.go
+++ b/pkg/query/logical/common_test.go
@@ -36,6 +36,7 @@
"github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/pkg/pb"
"github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
@@ -150,21 +151,39 @@
return ec
}
-//TODO: pass correct shardID
func (f *mockDataFactory) MockIndexScan(startTime, endTime time.Time, indexMatches ...*indexMatcher) executor.ExecutionContext {
ec := executor.NewMockExecutionContext(f.ctrl)
+ usedShards := make(map[uint]posting.List)
+
for _, im := range indexMatches {
ec.
EXPECT().
- Search(*f.traceMetadata, uint(0), uint64(startTime.UnixNano()), uint64(endTime.UnixNano()), im).
+ Search(*f.traceMetadata, gomock.Eq(im.shardID), uint64(startTime.UnixNano()), uint64(endTime.UnixNano()), gomock.Any(), im).
Return(im.chunkIDs, nil)
+
+ if list, ok := usedShards[im.shardID]; ok {
+ _ = list.Intersect(im.chunkIDs)
+ } else {
+ usedShards[im.shardID] = im.chunkIDs
+ }
+
+ ec.
+ EXPECT().
+ Search(*f.traceMetadata, gomock.Not(gomock.Eq(im.shardID)), uint64(startTime.UnixNano()), uint64(endTime.UnixNano()), gomock.Any(), im).
+ Return(roaring.NewPostingList(), nil)
}
- ec.
- EXPECT().
- FetchEntity(*f.traceMetadata, uint(0), gomock.Any(), gomock.Any()).
- DoAndReturn(func(_ common.Metadata, _ uint, chunkIDs posting.List, _ series.ScanOptions) ([]data.Entity, error) {
- return GenerateEntities(GeneratorFromArray(chunkIDs)), nil
- })
+
+ for shardID := uint(0); shardID < uint(f.s.ShardNumber()); shardID++ {
+ if chunkList, ok := usedShards[shardID]; ok && chunkList.Len() > 0 {
+ ec.
+ EXPECT().
+ FetchEntity(*f.traceMetadata, shardID, gomock.Any(), gomock.Any()).
+ DoAndReturn(func(_ common.Metadata, _ uint, chunkIDs posting.List, _ series.ScanOptions) ([]data.Entity, error) {
+ return GenerateEntities(GeneratorFromArray(chunkIDs)), nil
+ })
+ }
+ }
+
return ec
}
@@ -193,6 +212,7 @@
type indexMatcher struct {
key string
+ shardID uint
chunkIDs posting.List
}
@@ -211,9 +231,10 @@
return fmt.Sprintf("is search for key %s", i.key)
}
-func newIndexMatcher(key string, chunkIDs posting.List) *indexMatcher {
+func newIndexMatcher(key string, shardID uint, chunkIDs posting.List) *indexMatcher {
return &indexMatcher{
key: key,
+ shardID: shardID,
chunkIDs: chunkIDs,
}
}
diff --git a/pkg/query/logical/plan_execution_test.go b/pkg/query/logical/plan_execution_test.go
index 199b029..b4fa588 100644
--- a/pkg/query/logical/plan_execution_test.go
+++ b/pkg/query/logical/plan_execution_test.go
@@ -128,7 +128,8 @@
traceID := "asdf1234"
- p := logical.TraceIDFetch(traceID, m, s)
+ p, err := logical.TraceIDFetch(traceID, m).Analyze(s)
+ assert.NoError(err)
assert.NotNil(p)
f := newMockDataFactory(ctrl, m, s, 10)
entities, err := p.Execute(f.MockTraceIDFetch(traceID))
@@ -156,7 +157,7 @@
unresolvedPlan: logical.IndexScan(st.UnixNano(), et.UnixNano(), m, []logical.Expr{
logical.Eq(logical.NewFieldRef("http.method"), logical.Str("GET")),
}, series.TraceStateDefault),
- indexMatchers: []*indexMatcher{newIndexMatcher("http.method", roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))},
+ indexMatchers: []*indexMatcher{newIndexMatcher("http.method", 0, roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))},
wantLength: 10,
},
{
@@ -166,8 +167,8 @@
logical.Eq(logical.NewFieldRef("service_id"), logical.Str("app")),
}, series.TraceStateDefault),
indexMatchers: []*indexMatcher{
- newIndexMatcher("http.method", roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
- newIndexMatcher("service_id", roaring.NewPostingListWithInitialData(1, 3, 5, 7, 9)),
+ newIndexMatcher("http.method", 0, roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
+ newIndexMatcher("service_id", 0, roaring.NewPostingListWithInitialData(1, 3, 5, 7, 9)),
},
wantLength: 5,
},
@@ -178,8 +179,8 @@
logical.Eq(logical.NewFieldRef("service_id"), logical.Str("app")),
}, series.TraceStateDefault),
indexMatchers: []*indexMatcher{
- newIndexMatcher("http.method", roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
- newIndexMatcher("service_id", roaring.NewPostingList()),
+ newIndexMatcher("http.method", 0, roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
+ newIndexMatcher("service_id", 0, roaring.NewPostingList()),
},
wantLength: 0,
},
diff --git a/pkg/query/logical/plan_indexscan.go b/pkg/query/logical/plan_indexscan.go
index e50249a..2a1f12f 100644
--- a/pkg/query/logical/plan_indexscan.go
+++ b/pkg/query/logical/plan_indexscan.go
@@ -30,7 +30,7 @@
"github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/pkg/posting"
- executor2 "github.com/apache/skywalking-banyandb/pkg/query/executor"
+ "github.com/apache/skywalking-banyandb/pkg/query/executor"
)
var _ UnresolvedPlan = (*unresolvedIndexScan)(nil)
@@ -107,31 +107,54 @@
traceState series.TraceState
}
-func (i *indexScan) Execute(ec executor2.ExecutionContext) ([]data.Entity, error) {
- var chunkSet posting.List
- for _, exprs := range i.conditionMap {
- // TODO: Discuss which metadata should be used!
- // 1) traceSeries Metadata: indirect mapping
- // 2) indexRule Metadata: cannot uniquely determine traceSeries
- // TODO: should pass correct shardID
- chunks, err := ec.Search(*i.traceMetadata, 0, uint64(i.startTime), uint64(i.endTime), convertToConditions(exprs))
+func (i *indexScan) Execute(ec executor.ExecutionContext) ([]data.Entity, error) {
+ dataEntities := make([]data.Entity, 0)
+
+ // iterate over shards
+ // TODO: we have to push down Limit and Offset to set a threshold to IndexSearch operations
+ for shardID := uint32(0); shardID < i.schema.ShardNumber(); shardID++ {
+ var chunkSet posting.List
+
+ // first collect all chunkIDs via indexes
+ for idxObj, exprs := range i.conditionMap {
+ // 1) traceSeries Metadata -> IndexObject
+ // 2) IndexObject -> Fields
+ chunks, err := ec.Search(*i.traceMetadata, uint(shardID), uint64(i.startTime), uint64(i.endTime), idxObj.GetName(), convertToConditions(exprs))
+ if err != nil {
+ return nil, err
+ }
+ if chunkSet == nil {
+ // chunkSet is nil before the first assignment
+ chunkSet = chunks
+ } else {
+ // afterwards, it must not be nil
+ _ = chunkSet.Intersect(chunks)
+ // stop loop if chunkSet is empty
+ if chunkSet.Len() == 0 {
+ continue
+ }
+ }
+ }
+
+ if chunkSet == nil || chunkSet.Len() == 0 {
+ continue
+ }
+
+ // fetch entities with chunkIDs
+ entitiesFromSingleShard, err := ec.FetchEntity(*i.traceMetadata, uint(shardID), chunkSet, series.ScanOptions{
+ Projection: i.projectionFields,
+ State: i.traceState,
+ })
+
if err != nil {
return nil, err
}
- if chunkSet == nil {
- // chunkSet is nil before the first assignment
- chunkSet = chunks
- } else {
- // afterwards, it must not be nil
- _ = chunkSet.Intersect(chunks)
- }
+
+ // merge results
+ dataEntities = append(dataEntities, entitiesFromSingleShard...)
}
- //TODO: pass correct shardID
- return ec.FetchEntity(*i.traceMetadata, 0, chunkSet, series.ScanOptions{
- Projection: i.projectionFields,
- State: i.traceState,
- })
+ return dataEntities, nil
}
func (i *indexScan) String() string {
diff --git a/pkg/query/logical/plan_tablescan.go b/pkg/query/logical/plan_tablescan.go
index 0480faa..6adf6b5 100644
--- a/pkg/query/logical/plan_tablescan.go
+++ b/pkg/query/logical/plan_tablescan.go
@@ -47,11 +47,10 @@
func (u *unresolvedTableScan) Analyze(schema Schema) (Plan, error) {
if u.projectionFields == nil || len(u.projectionFields) == 0 {
return &tableScan{
- startTime: u.startTime,
- endTime: u.endTime,
- projectionFieldRefs: nil,
- schema: schema,
- traceMetadata: u.traceMetadata,
+ startTime: u.startTime,
+ endTime: u.endTime,
+ schema: schema,
+ traceMetadata: u.traceMetadata,
}, nil
}
diff --git a/pkg/query/logical/plan_traceid.go b/pkg/query/logical/plan_traceid.go
index 4f87a6e..cc4255e 100644
--- a/pkg/query/logical/plan_traceid.go
+++ b/pkg/query/logical/plan_traceid.go
@@ -21,6 +21,7 @@
"fmt"
"github.com/google/go-cmp/cmp"
+ "github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
@@ -28,20 +29,51 @@
"github.com/apache/skywalking-banyandb/pkg/query/executor"
)
+var _ UnresolvedPlan = (*unresolvedTraceIDFetch)(nil)
var _ Plan = (*traceIDFetch)(nil)
-type traceIDFetch struct {
- metadata *common.Metadata
- schema Schema
- traceID string
+type unresolvedTraceIDFetch struct {
+ metadata *common.Metadata
+ traceID string
+ projectionFields []string
}
-func (t *traceIDFetch) Execute(ec executor.ExecutionContext) ([]data.Entity, error) {
- traceData, err := ec.FetchTrace(*t.metadata, t.traceID, series.ScanOptions{})
+func (t *unresolvedTraceIDFetch) Analyze(s Schema) (Plan, error) {
+ if t.projectionFields == nil || len(t.projectionFields) == 0 {
+ return &traceIDFetch{
+ metadata: t.metadata,
+ schema: s,
+ traceID: t.traceID,
+ }, nil
+ }
+
+ if s == nil {
+ return nil, errors.Wrap(ErrInvalidSchema, "nil")
+ }
+
+ fieldRefs, err := s.CreateRef(t.projectionFields...)
if err != nil {
return nil, err
}
- return traceData.Entities, nil
+ return &traceIDFetch{
+ projectionFields: t.projectionFields,
+ projectionFieldRefs: fieldRefs,
+ schema: s,
+ traceID: t.traceID,
+ metadata: t.metadata,
+ }, nil
+}
+
+func (t *unresolvedTraceIDFetch) Type() PlanType {
+ return PlanTraceIDFetch
+}
+
+type traceIDFetch struct {
+ metadata *common.Metadata
+ traceID string
+ projectionFields []string
+ projectionFieldRefs []*FieldRef
+ schema Schema
}
func (t *traceIDFetch) String() string {
@@ -52,14 +84,14 @@
)
}
-func (t *traceIDFetch) Type() PlanType {
- return PlanTraceIDFetch
-}
-
func (t *traceIDFetch) Children() []Plan {
return []Plan{}
}
+func (t *traceIDFetch) Type() PlanType {
+ return PlanTraceIDFetch
+}
+
func (t *traceIDFetch) Schema() Schema {
return t.schema
}
@@ -74,10 +106,20 @@
cmp.Equal(t.metadata, other.metadata)
}
-func TraceIDFetch(traceID string, metadata *common.Metadata, schema Schema) Plan {
- return &traceIDFetch{
- metadata: metadata,
- schema: schema,
- traceID: traceID,
+func (t *traceIDFetch) Execute(ec executor.ExecutionContext) ([]data.Entity, error) {
+ traceData, err := ec.FetchTrace(*t.metadata, t.traceID, series.ScanOptions{
+ Projection: t.projectionFields,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return traceData.Entities, nil
+}
+
+func TraceIDFetch(traceID string, metadata *common.Metadata, projection ...string) UnresolvedPlan {
+ return &unresolvedTraceIDFetch{
+ metadata: metadata,
+ traceID: traceID,
+ projectionFields: projection,
}
}
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 8418fb4..00542a5 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -31,6 +31,9 @@
CreateRef(names ...string) ([]*FieldRef, error)
Map(refs ...*FieldRef) Schema
Equal(Schema) bool
+ ShardNumber() uint32
+ TraceIDFieldName() string
+ TraceStateFieldName() string
}
type fieldSpec struct {
@@ -45,8 +48,17 @@
var _ Schema = (*schema)(nil)
type schema struct {
- indexRule apischema.IndexRule
- fieldMap map[string]*fieldSpec
+ traceSeries *apiv1.TraceSeries
+ indexRule apischema.IndexRule
+ fieldMap map[string]*fieldSpec
+}
+
+func (s *schema) TraceIDFieldName() string {
+ return s.traceSeries.GetReservedFieldsMap().GetTraceId()
+}
+
+func (s *schema) TraceStateFieldName() string {
+ return s.traceSeries.GetReservedFieldsMap().GetState().GetField()
}
// IndexDefined checks whether the field given is indexed
@@ -100,11 +112,16 @@
return nil
}
newS := &schema{
- indexRule: s.indexRule,
- fieldMap: make(map[string]*fieldSpec),
+ traceSeries: s.traceSeries,
+ indexRule: s.indexRule,
+ fieldMap: make(map[string]*fieldSpec),
}
for _, ref := range refs {
newS.fieldMap[ref.name] = ref.spec
}
return newS
}
+
+func (s *schema) ShardNumber() uint32 {
+ return s.traceSeries.Shard.Number
+}