blob: 64985f3c39c73406cc8bf4d3b5999d8996b2a7bb [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package trace
import (
"encoding/hex"
"time"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/partition"
"github.com/apache/skywalking-banyandb/pkg/pb"
"github.com/apache/skywalking-banyandb/pkg/posting"
"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
)
func (t *traceSeries) FetchTrace(traceID string, opt series.ScanOptions) (trace data.Trace, err error) {
if traceID == "" {
return trace, ErrInvalidTraceID
}
traceIDBytes := []byte(traceID)
traceIDShardID := partition.ShardID(traceIDBytes, t.shardNum)
bb, errTraceID := t.reader.TimeSeriesReader(traceIDShardID, traceIndex, 0, 0).GetAll(traceIDBytes)
if errTraceID != nil {
return trace, errTraceID
}
t.l.Debug().Uint("shard_id", traceIDShardID).
Str("trace_id", traceID).
Hex("trace_id_bytes", traceIDBytes).
Int("chunk_num", len(bb)).Msg("fetch Trace by trace_id")
if len(bb) < 1 {
return trace, nil
}
dataMap := make(map[uint]posting.List)
for _, b := range bb {
id := idWithShard{
id: common.ChunkID(convert.BytesToUint64(b[2:])),
shardID: uint(convert.BytesToUint16(b[:2])),
}
placeID(dataMap, id)
}
var entities []data.Entity
for s, c := range dataMap {
ee, errEntity := t.FetchEntity(c, s, opt)
if errEntity != nil {
err = multierr.Append(err, errEntity)
continue
}
entities = append(entities, ee...)
}
return data.Trace{
KindVersion: data.TraceKindVersion,
Entities: entities,
}, err
}
func (t *traceSeries) ScanEntity(startTime, endTime uint64, opt series.ScanOptions) ([]data.Entity, error) {
total := opt.Limit
if total < 1 {
total = 10
}
states := make([]byte, 0, 2)
switch opt.State {
case series.TraceStateSuccess:
states = append(states, StateSuccess)
case series.TraceStateError:
states = append(states, StateError)
case series.TraceStateDefault:
states = append(states, StateSuccess, StateError)
}
seekKeys := make([][]byte, 0, len(states))
startTimeBytes := convert.Uint64ToBytes(startTime)
for _, state := range states {
key := make([]byte, 8+1)
key[0] = state
copy(key[1:], startTimeBytes)
seekKeys = append(seekKeys, key)
}
entities := make([]data.Entity, 0, total)
var num uint32
opts := kv.DefaultScanOpts
opts.PrefetchValues = false
opts.PrefetchSize = int(total)
var errAll error
for i := uint(0); i < uint(t.shardNum); i++ {
chunkIDs := roaring.NewPostingList()
for _, seekKey := range seekKeys {
state := seekKey[0]
err := t.reader.Reader(i, startTimeIndex, startTime, endTime).Scan(
seekKey,
opts,
func(shardID int, key []byte, _ func() ([]byte, error)) error {
if len(key) <= 9 {
return errors.Wrapf(ErrInvalidKey, "key:%s", hex.EncodeToString(key))
}
if key[0] != state {
return kv.ErrStopScan
}
ts := convert.BytesToUint64(key[1 : 8+1])
if ts > endTime {
return nil
}
chunk := make([]byte, len(key)-8-1)
copy(chunk, key[8+1:])
chunkIDs.Insert(common.ChunkID(convert.BytesToUint64(chunk)))
num++
if num > total {
return kv.ErrStopScan
}
return nil
})
if err != nil {
errAll = multierr.Append(errAll, err)
}
}
if chunkIDs.IsEmpty() {
continue
}
ee, err := t.FetchEntity(chunkIDs, i, opt)
if err != nil {
errAll = multierr.Append(errAll, err)
continue
}
entities = append(entities, ee...)
}
return entities, errAll
}
func (t *traceSeries) FetchEntity(chunkIDs posting.List, shardID uint, opt series.ScanOptions) (entities []data.Entity, err error) {
chunkIDsLen := chunkIDs.Len()
if chunkIDsLen < 1 {
return nil, ErrChunkIDsEmpty
}
entities = make([]data.Entity, 0, chunkIDsLen)
fetchDataBinary, fetchFieldsIndices, errInfo := t.parseFetchInfo(opt)
if errInfo != nil {
return nil, errInfo
}
if !fetchDataBinary && len(fetchFieldsIndices) < 1 {
return nil, ErrProjectionEmpty
}
for iter := chunkIDs.Iterator(); iter.Next(); {
id := iter.Current()
chunkID := uint64(id)
ts, errParseTS := t.idGen.ParseTS(chunkID)
if errParseTS != nil {
err = multierr.Append(err, errParseTS)
}
ref, chunkErr := t.reader.Reader(shardID, chunkIDMapping, ts, ts).Get(convert.Uint64ToBytes(chunkID))
if chunkErr != nil {
err = multierr.Append(err, chunkErr)
continue
}
sRef := ref[:len(ref)-8]
seriesID := sRef[1:]
state := sRef[0]
t.l.Debug().
Uint64("chunk_id", chunkID).
Hex("id", ref).
Uint64("series_id", convert.BytesToUint64(seriesID)).
Uint("shard_id", shardID).
Time("ts", time.Unix(0, int64(ts))).
Uint64("ts_int", ts).
Msg("fetch internal id by chunk_id")
entity, errGet := t.getEntityByInternalRef(seriesID, State(state), fetchDataBinary, fetchFieldsIndices, shardID, ts)
if errGet != nil {
err = multierr.Append(err, errGet)
continue
}
t.l.Debug().
Str("entity_id", entity.GetEntityId()).
Int("fields_num", len(entity.GetFields())).
Int("data_binary_size_bytes", len(entity.GetDataBinary())).
Msg("fetch entity")
entities = append(entities, entity)
}
return entities, err
}
func (t *traceSeries) parseFetchInfo(opt series.ScanOptions) (fetchDataBinary bool, fetchFieldsIndices []pb.FieldEntry, err error) {
fetchFieldsIndices = make([]pb.FieldEntry, 0)
for _, p := range opt.Projection {
if p == common.DataBinaryFieldName {
fetchDataBinary = true
t.l.Debug().Msg("to fetch data binary")
continue
}
index, ok := t.fieldIndex[p]
if !ok {
return false, nil, errors.Wrapf(ErrFieldNotFound, "field name:%s", p)
}
fetchFieldsIndices = append(fetchFieldsIndices, pb.FieldEntry{
Key: p,
Index: index,
})
t.l.Debug().Str("name", p).Int("index", index).Msg("to fetch the field")
}
return fetchDataBinary, fetchFieldsIndices, nil
}
func (t *traceSeries) getEntityByInternalRef(seriesID []byte, state State, fetchDataBinary bool,
fetchFieldsIndices []pb.FieldEntry, shardID uint, ts uint64) (data.Entity, error) {
fieldsStore, dataStore, err := getStoreName(state)
if err != nil {
return data.Entity{}, err
}
val, getErr := t.reader.TimeSeriesReader(shardID, fieldsStore, ts, ts).Get(seriesID, ts)
if getErr != nil {
return data.Entity{}, getErr
}
// deserialize write.EntityValue
entityVal := &v1.EntityValue{}
if err := proto.Unmarshal(val, entityVal); err != nil {
return data.Entity{}, err
}
// transform to query.Entity
entity := v1.Entity{
EntityId: entityVal.GetEntityId(),
Timestamp: entityVal.GetTimestamp(),
}
// Copy selected fields
if len(fetchFieldsIndices) > 0 {
entity.Fields = pb.Transform(entityVal, fetchFieldsIndices)
}
if fetchDataBinary {
val, getErr = t.reader.TimeSeriesReader(shardID, dataStore, ts, ts).Get(seriesID, ts)
if getErr != nil {
return data.Entity{}, getErr
}
entity.DataBinary = val
}
return data.Entity{
Entity: &entity,
}, nil
}
type idWithShard struct {
id common.ChunkID
shardID uint
}
func placeID(chunkIDCriteria map[uint]posting.List, data idWithShard) {
list, ok := chunkIDCriteria[data.shardID]
if ok {
list.Insert(data.id)
return
}
list = roaring.NewPostingList()
list.Insert(data.id)
chunkIDCriteria[data.shardID] = list
}