blob: 585a746cf0ad30d979b43dd44773394ef3d1ffa7 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package logical
import (
"context"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/series"
seriesSchema "github.com/apache/skywalking-banyandb/banyand/series/schema"
"github.com/apache/skywalking-banyandb/banyand/series/schema/sw"
)
var (
ErrFieldNotDefined = errors.New("field is not defined")
ErrInvalidConditionType = errors.New("invalid pair type")
ErrIncompatibleQueryCondition = errors.New("incompatible query condition type")
ErrInvalidSchema = errors.New("invalid schema")
ErrIndexNotDefined = errors.New("index is not define for the field")
)
var (
DefaultLimit uint32 = 20
)
type Analyzer struct {
traceSeries seriesSchema.TraceSeries
indexRule seriesSchema.IndexRule
}
func DefaultAnalyzer() *Analyzer {
return &Analyzer{
sw.NewTraceSeries(),
sw.NewIndexRule(),
}
}
func (a *Analyzer) BuildTraceSchema(ctx context.Context, metadata common.Metadata) (Schema, error) {
traceSeries, err := a.traceSeries.Get(ctx, metadata)
if err != nil {
return nil, err
}
indexRule, err := a.indexRule.Get(ctx, metadata)
if err != nil {
return nil, err
}
s := &schema{
traceSeries: traceSeries.Spec,
indexRule: indexRule,
fieldMap: make(map[string]*fieldSpec),
}
// generate the schema of the fields for the traceSeries
for i, f := range traceSeries.Spec.GetFields() {
s.RegisterField(f.GetName(), i, f)
}
return s, nil
}
func (a *Analyzer) Analyze(_ context.Context, criteria *apiv1.QueryRequest, traceMetadata *common.Metadata, s Schema) (Plan, error) {
// parse tableScan
timeRange := criteria.GetTimeRange()
// parse projection
projStr := criteria.GetProjection().GetKeyNames()
var plan UnresolvedPlan
// parse selection
if criteria.GetFields() != nil && len(criteria.GetFields()) > 0 {
// mark if there is indexScan request
useIndexScan := false
var fieldExprs []Expr
traceState := series.TraceStateDefault
for _, pairQuery := range criteria.GetFields() {
op := pairQuery.GetOp()
typedPair := pairQuery.GetCondition()
switch v := typedPair.GetTyped().(type) {
case *apiv1.TypedPair_StrPair:
// check special field `trace_id`
if v.StrPair.GetKey() == s.TraceIDFieldName() {
plan = TraceIDFetch(v.StrPair.GetValues()[0], traceMetadata, projStr...)
break
}
useIndexScan = true
lit := parseStrLiteral(v.StrPair)
fieldExprs = append(fieldExprs, binaryOpFactory[op](NewFieldRef(v.StrPair.GetKey()), lit))
case *apiv1.TypedPair_IntPair:
// check special field `state`
if v.IntPair.GetKey() == s.TraceStateFieldName() {
traceState = series.TraceState(v.IntPair.GetValues()[0])
continue
}
lit := parseIntLiteral(v.IntPair)
fieldExprs = append(fieldExprs, binaryOpFactory[op](NewFieldRef(v.IntPair.GetKey()), lit))
useIndexScan = true
default:
return nil, ErrInvalidConditionType
}
}
// if plan is already assigned, skip
if plan == nil {
// first check if we can use index-scan
if useIndexScan {
plan = IndexScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, fieldExprs, traceState, projStr...)
} else {
plan = TableScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, traceState, projStr...)
}
}
} else {
plan = TableScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, series.TraceStateDefault, projStr...)
}
// parse orderBy
queryOrder := criteria.GetOrderBy()
if queryOrder != nil {
plan = OrderBy(plan, queryOrder.GetKeyName(), queryOrder.GetSort())
}
// parse offset
plan = Offset(plan, criteria.GetOffset())
// parse limit
limitParameter := criteria.GetLimit()
if limitParameter == 0 {
limitParameter = DefaultLimit
}
plan = Limit(plan, limitParameter)
return plan.Analyze(s)
}
func parseStrLiteral(pair *apiv1.StrPair) Expr {
if len(pair.GetValues()) == 1 {
return &strLiteral{
pair.GetValues()[0],
}
}
return &strArrLiteral{arr: pair.GetValues()}
}
func parseIntLiteral(pair *apiv1.IntPair) Expr {
if len(pair.GetValues()) == 1 {
return &int64Literal{
pair.GetValues()[0],
}
}
return &int64ArrLiteral{arr: pair.GetValues()}
}