blob: fe94b4447b4cee287d3239442d6af9f771a50c8c [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package index
import (
"bytes"
"math"
"go.uber.org/multierr"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
type CompositePostingValueFn = func(term, value []byte, delegated kv.Iterator) (*PostingValue, error)
var (
_ FieldIterator = (*FieldIteratorTemplate)(nil)
DefaultUpper = convert.Uint64ToBytes(math.MaxUint64)
DefaultLower = convert.Uint64ToBytes(0)
)
type FieldIteratorTemplate struct {
delegated *delegateIterator
init bool
cur *PostingValue
err error
termRange RangeOpts
fn CompositePostingValueFn
reverse bool
seekKey []byte
}
func (f *FieldIteratorTemplate) Next() bool {
if !f.init {
f.init = true
f.delegated.Seek(f.seekKey)
}
if !f.delegated.Valid() {
return false
}
pv, err := f.fn(f.delegated.Field().Term, f.delegated.Val(), f.delegated)
if err != nil {
f.err = err
return false
}
in := f.termRange.Between(pv.Term)
switch {
case in > 0:
if f.reverse {
return f.Next()
}
return false
case in < 0:
if f.reverse {
return false
}
return f.Next()
}
f.cur = pv
return true
}
func (f *FieldIteratorTemplate) Val() *PostingValue {
return f.cur
}
func (f *FieldIteratorTemplate) Close() error {
return multierr.Append(f.err, f.delegated.Close())
}
func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, iterable kv.Iterable,
fn CompositePostingValueFn,
) (*FieldIteratorTemplate, error) {
if termRange.Upper == nil {
termRange.Upper = DefaultUpper
}
if termRange.Lower == nil {
termRange.Lower = DefaultLower
}
var reverse bool
var term []byte
switch order {
case modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_UNSPECIFIED:
term = termRange.Lower
reverse = false
case modelv1.Sort_SORT_DESC:
term = termRange.Upper
reverse = true
}
if order == modelv1.Sort_SORT_DESC {
reverse = true
}
iter := iterable.NewIterator(kv.ScanOpts{
Prefix: fieldKey.Marshal(),
Reverse: reverse,
})
field := Field{
Key: fieldKey,
Term: term,
}
seekKey, err := field.Marshal()
if err != nil {
return nil, err
}
return &FieldIteratorTemplate{
delegated: newDelegateIterator(iter, fieldKey, l),
termRange: termRange,
fn: fn,
reverse: reverse,
seekKey: seekKey,
}, nil
}
func parseKey(fieldKey FieldKey, key []byte) (Field, error) {
f := &Field{
Key: fieldKey,
}
err := f.Unmarshal(key)
if err != nil {
return *f, err
}
return *f, nil
}
type SwitchFn = func(a, b []byte) bool
var _ FieldIterator = (*mergedIterator)(nil)
type mergedIterator struct {
inner []FieldIterator
drained []FieldIterator
drainedCount int
cur *PostingValue
switchFn SwitchFn
init bool
closed bool
}
func NewMergedIterator(merged []FieldIterator, fn SwitchFn) FieldIterator {
return &mergedIterator{
inner: merged,
drained: make([]FieldIterator, len(merged)),
switchFn: fn,
}
}
func (m *mergedIterator) Next() bool {
if m.closed {
return false
}
if m.allDrained() {
return false
}
if !m.init {
for i, iterator := range m.inner {
if !iterator.Next() {
m.drain(i)
}
}
if m.allDrained() {
return false
}
m.init = true
}
var head FieldIterator
var headIndex int
for i, iterator := range m.inner {
if iterator == nil {
continue
}
if head == nil {
head = iterator
continue
}
if m.switchFn(head.Val().Term, iterator.Val().Term) {
head = iterator
headIndex = i
}
}
m.cur = head.Val()
if !head.Next() {
m.drain(headIndex)
}
return true
}
func (m *mergedIterator) Val() *PostingValue {
return m.cur
}
func (m *mergedIterator) Close() error {
m.closed = true
var err error
for _, iterator := range m.drained {
if iterator == nil {
continue
}
err = multierr.Append(err, iterator.Close())
}
return err
}
func (m *mergedIterator) drain(index int) {
m.drained[index], m.inner[index] = m.inner[index], nil
m.drainedCount++
}
func (m *mergedIterator) allDrained() bool {
return m.drainedCount == len(m.inner)
}
var _ kv.Iterator = (*delegateIterator)(nil)
type delegateIterator struct {
delegated kv.Iterator
fieldKey FieldKey
fieldKeyBytes []byte
l *logger.Logger
curField Field
closed bool
}
func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, l *logger.Logger) *delegateIterator {
fieldKeyBytes := fieldKey.Marshal()
return &delegateIterator{
delegated: delegated,
fieldKey: fieldKey,
fieldKeyBytes: fieldKeyBytes,
l: l,
}
}
func (di *delegateIterator) Next() {
di.delegated.Next()
}
func (di *delegateIterator) Rewind() {
di.delegated.Rewind()
}
func (di *delegateIterator) Seek(key []byte) {
di.delegated.Seek(key)
}
func (di *delegateIterator) Key() []byte {
return di.delegated.Key()
}
func (di *delegateIterator) Field() Field {
return di.curField
}
func (di *delegateIterator) Val() []byte {
return di.delegated.Val()
}
func (di *delegateIterator) Valid() bool {
if di.closed || !di.delegated.Valid() {
return false
}
var err error
di.curField, err = parseKey(di.fieldKey, di.Key())
if err != nil {
di.l.Error().Err(err).Msg("fail to parse field from key")
di.Close()
return false
}
if !bytes.Equal(di.curField.Key.Marshal(), di.fieldKeyBytes) {
di.l.Debug().
Uint64("series_id", uint64(di.fieldKey.SeriesID)).
Uint32("index_rule_id", di.fieldKey.IndexRuleID).
Msg("reached the limitation of the field(series_id+index_rule_id)")
di.Close()
return false
}
return true
}
func (di *delegateIterator) Close() error {
di.closed = true
return nil
}