blob: 261b39e76eee2fc9c29d7e7bf77e2ee67ca1ded5 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package tsdb
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type IndexDatabase interface {
WriterBuilder() IndexWriterBuilder
Seek(field index.Field) ([]GlobalItemID, error)
}
type IndexWriter interface {
WriteLSMIndex(field index.Field) error
WriteInvertedIndex(field index.Field) error
}
type IndexWriterBuilder interface {
Scope(scope Entry) IndexWriterBuilder
Time(ts time.Time) IndexWriterBuilder
GlobalItemID(itemID GlobalItemID) IndexWriterBuilder
Build() (IndexWriter, error)
}
type IndexSeekBuilder interface{}
var _ IndexDatabase = (*indexDB)(nil)
type indexDB struct {
shardID common.ShardID
segCtrl *segmentController
}
func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
result := make([]GlobalItemID, 0)
f, err := field.Marshal()
if err != nil {
return nil, err
}
for _, s := range i.segCtrl.segments() {
err = s.globalIndex.GetAll(f, func(rawBytes []byte) error {
id := &GlobalItemID{}
errUnMarshal := id.UnMarshal(rawBytes)
if errUnMarshal != nil {
return errUnMarshal
}
result = append(result, *id)
return nil
})
if err == kv.ErrKeyNotFound {
return result, nil
}
}
return result, err
}
func (i *indexDB) WriterBuilder() IndexWriterBuilder {
return newIndexWriterBuilder(i.segCtrl)
}
func newIndexDatabase(_ context.Context, id common.ShardID, segCtrl *segmentController) (IndexDatabase, error) {
return &indexDB{
shardID: id,
segCtrl: segCtrl,
}, nil
}
var _ IndexWriterBuilder = (*indexWriterBuilder)(nil)
type indexWriterBuilder struct {
scope Entry
segCtrl *segmentController
ts time.Time
seg *segment
globalItemID *GlobalItemID
}
func (i *indexWriterBuilder) Scope(scope Entry) IndexWriterBuilder {
i.scope = scope
return i
}
func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
i.ts = ts
segs := i.segCtrl.span(timestamp.NewTimeRangeDuration(ts, 0, true, false))
if len(segs) != 1 {
return i
}
i.seg = segs[0]
return i
}
func (i *indexWriterBuilder) GlobalItemID(itemID GlobalItemID) IndexWriterBuilder {
i.globalItemID = &itemID
return i
}
func (i *indexWriterBuilder) Build() (IndexWriter, error) {
if i.seg == nil {
return nil, errors.WithStack(ErrNoTime)
}
if i.globalItemID == nil {
return nil, errors.WithStack(ErrNoVal)
}
return &indexWriter{
scope: i.scope,
seg: i.seg,
ts: i.ts,
itemID: i.globalItemID,
}, nil
}
func newIndexWriterBuilder(segCtrl *segmentController) IndexWriterBuilder {
return &indexWriterBuilder{
segCtrl: segCtrl,
}
}
var _ IndexWriter = (*indexWriter)(nil)
type indexWriter struct {
scope Entry
seg *segment
ts time.Time
itemID *GlobalItemID
}
func (i *indexWriter) WriteLSMIndex(field index.Field) error {
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
key, err := field.Marshal()
if err != nil {
return err
}
return i.seg.globalIndex.PutWithVersion(key, i.itemID.Marshal(), uint64(i.ts.UnixNano()))
}
func (i *indexWriter) WriteInvertedIndex(field index.Field) error {
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
key, err := field.Marshal()
if err != nil {
return err
}
return i.seg.globalIndex.PutWithVersion(key, i.itemID.Marshal(), uint64(i.ts.UnixNano()))
}
func GlobalSeriesID(scope Entry) common.SeriesID {
return common.SeriesID(convert.Hash(scope))
}