Remove term metadata store (#148)
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 95a449c..cb7b01f 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -67,7 +67,7 @@
p.log.Warn().Msg("invalid event data type")
return
}
- p.log.Info().Msg("received a query event")
+ p.log.Debug().Stringer("criteria", queryCriteria).Msg("received a query request")
meta := queryCriteria.GetMetadata()
ec, err := p.streamService.Stream(meta)
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index 4c8b958..261b39e 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -58,7 +58,7 @@
func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
result := make([]GlobalItemID, 0)
- f, err := field.MarshalStraight()
+ f, err := field.Marshal()
if err != nil {
return nil, err
}
@@ -154,7 +154,7 @@
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
- key, err := field.MarshalStraight()
+ key, err := field.Marshal()
if err != nil {
return err
}
@@ -165,7 +165,7 @@
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
- key, err := field.MarshalStraight()
+ key, err := field.Marshal()
if err != nil {
return err
}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index ebedc32..192a94f 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -27,7 +27,6 @@
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
)
@@ -68,40 +67,11 @@
Term []byte
}
-func (f Field) MarshalStraight() ([]byte, error) {
+func (f Field) Marshal() ([]byte, error) {
return bytes.Join([][]byte{f.Key.Marshal(), f.Term}, nil), nil
}
-func (f Field) Marshal(term metadata.Term) ([]byte, error) {
- var t []byte
- if f.Key.EncodeTerm {
- var err error
- t, err = term.ID(f.Term)
- if err != nil {
- return nil, errors.Wrap(err, "get term id")
- }
- f.Term = t
- }
- return f.MarshalStraight()
-}
-
-func (f *Field) Unmarshal(term metadata.Term, raw []byte) error {
- err := f.UnmarshalStraight(raw)
- if err != nil {
- return err
- }
- if !f.Key.EncodeTerm {
- return nil
- }
- t, err := term.Literal(f.Term)
- if err != nil {
- return errors.Wrap(err, "get term literal from metadata store")
- }
- f.Term = t
- return nil
-}
-
-func (f *Field) UnmarshalStraight(raw []byte) error {
+func (f *Field) Unmarshal(raw []byte) error {
fk := &f.Key
err := fk.Unmarshal(raw[:len(raw)-8])
if err != nil {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 8073d19..c126a6b 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -29,7 +29,6 @@
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -38,7 +37,6 @@
var _ index.Store = (*store)(nil)
type store struct {
- termMetadata metadata.Term
diskTable kv.IndexStore
memTable *memTable
immutableMemTable *memTable
@@ -57,23 +55,15 @@
if err != nil {
return nil, err
}
- var md metadata.Term
- if md, err = metadata.NewTerm(metadata.TermOpts{
- Path: opts.Path + "/tmd",
- Logger: opts.Logger,
- }); err != nil {
- return nil, err
- }
return &store{
- memTable: newMemTable(),
- diskTable: diskTable,
- termMetadata: md,
- l: opts.Logger,
+ memTable: newMemTable(),
+ diskTable: diskTable,
+ l: opts.Logger,
}, nil
}
func (s *store) Close() error {
- return multierr.Combine(s.diskTable.Close(), s.termMetadata.Close())
+ return s.diskTable.Close()
}
func (s *store) Write(field index.Field, chunkID common.ItemID) error {
@@ -92,7 +82,7 @@
s.memTable = newMemTable()
}
err := s.diskTable.
- Handover(s.immutableMemTable.Iter(s.termMetadata))
+ Handover(s.immutableMemTable.Iter())
if err != nil {
return err
}
@@ -104,9 +94,6 @@
stat := s.mainStats()
disk := s.diskTable.Stats()
stat.MaxMemBytes = disk.MaxMemBytes
- term := s.termMetadata.Stats()
- stat.MemBytes += term.MemBytes
- stat.MaxMemBytes += term.MaxMemBytes
return stat
}
@@ -127,7 +114,7 @@
}
func (s *store) MatchTerms(field index.Field) (posting.List, error) {
- f, err := field.Marshal(s.termMetadata)
+ f, err := field.Marshal()
if err != nil {
return nil, err
}
@@ -197,7 +184,7 @@
}
iters = append(iters, it)
}
- it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable, s.termMetadata,
+ it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable,
func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) {
list := roaring.NewPostingList()
err := list.Unmarshall(val)
@@ -214,7 +201,7 @@
f := index.Field{
Key: fieldKey,
}
- err := f.Unmarshal(s.termMetadata, delegated.Key())
+ err := f.Unmarshal(delegated.Key())
if err != nil {
return nil, err
}
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index 4d78f69..1b70b2c 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -28,7 +28,6 @@
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
)
@@ -147,14 +146,13 @@
var _ kv.Iterator = (*flushIterator)(nil)
type flushIterator struct {
- fieldIdx int
- termIdx int
- key []byte
- value []byte
- fields *fieldMap
- valid bool
- err error
- termMetadata metadata.Term
+ fieldIdx int
+ termIdx int
+ key []byte
+ value []byte
+ fields *fieldMap
+ valid bool
+ err error
}
func (i *flushIterator) Next() {
@@ -228,7 +226,7 @@
Key: term.key,
Term: value.Term,
}
- i.key, err = f.Marshal(i.termMetadata)
+ i.key, err = f.Marshal()
if err != nil {
i.err = multierr.Append(i.err, err)
return false
@@ -236,9 +234,8 @@
return true
}
-func (m *memTable) Iter(termMetadata metadata.Term) kv.Iterator {
+func (m *memTable) Iter() kv.Iterator {
return &flushIterator{
- fields: m.fields,
- termMetadata: termMetadata,
+ fields: m.fields,
}
}
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index 559802f..fe94b44 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -26,7 +26,6 @@
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -89,7 +88,7 @@
}
func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, iterable kv.Iterable,
- metadata metadata.Term, fn CompositePostingValueFn,
+ fn CompositePostingValueFn,
) (*FieldIteratorTemplate, error) {
if termRange.Upper == nil {
termRange.Upper = DefaultUpper
@@ -118,12 +117,12 @@
Key: fieldKey,
Term: term,
}
- seekKey, err := field.Marshal(metadata)
+ seekKey, err := field.Marshal()
if err != nil {
return nil, err
}
return &FieldIteratorTemplate{
- delegated: newDelegateIterator(iter, fieldKey, metadata, l),
+ delegated: newDelegateIterator(iter, fieldKey, l),
termRange: termRange,
fn: fn,
reverse: reverse,
@@ -131,11 +130,11 @@
}, nil
}
-func parseKey(fieldKey FieldKey, metadata metadata.Term, key []byte) (Field, error) {
+func parseKey(fieldKey FieldKey, key []byte) (Field, error) {
f := &Field{
Key: fieldKey,
}
- err := f.Unmarshal(metadata, key)
+ err := f.Unmarshal(key)
if err != nil {
return *f, err
}
@@ -235,20 +234,18 @@
delegated kv.Iterator
fieldKey FieldKey
fieldKeyBytes []byte
- metadata metadata.Term
l *logger.Logger
curField Field
closed bool
}
-func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, metadata metadata.Term, l *logger.Logger) *delegateIterator {
+func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, l *logger.Logger) *delegateIterator {
fieldKeyBytes := fieldKey.Marshal()
return &delegateIterator{
delegated: delegated,
fieldKey: fieldKey,
fieldKeyBytes: fieldKeyBytes,
- metadata: metadata,
l: l,
}
}
@@ -282,7 +279,7 @@
return false
}
var err error
- di.curField, err = parseKey(di.fieldKey, di.metadata, di.Key())
+ di.curField, err = parseKey(di.fieldKey, di.Key())
if err != nil {
di.l.Error().Err(err).Msg("fail to parse field from key")
di.Close()
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index b2ef18c..53de61b 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -18,23 +18,19 @@
package lsm
import (
- "go.uber.org/multierr"
-
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
var _ index.Store = (*store)(nil)
type store struct {
- lsm kv.Store
- termMetadata metadata.Term
- l *logger.Logger
+ lsm kv.Store
+ l *logger.Logger
}
func (*store) Flush() error {
@@ -42,20 +38,15 @@
}
func (s *store) Stats() observability.Statistics {
- store := s.lsm.Stats()
- term := s.termMetadata.Stats()
- return observability.Statistics{
- MemBytes: store.MemBytes + term.MemBytes,
- MaxMemBytes: store.MaxMemBytes + term.MaxMemBytes,
- }
+ return s.lsm.Stats()
}
func (s *store) Close() error {
- return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
+ return s.lsm.Close()
}
func (s *store) Write(field index.Field, itemID common.ItemID) error {
- f, err := field.Marshal(s.termMetadata)
+ f, err := field.Marshal()
if err != nil {
return err
}
@@ -74,16 +65,8 @@
if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", kv.StoreWithLogger(opts.Logger)); err != nil {
return nil, err
}
- var md metadata.Term
- if md, err = metadata.NewTerm(metadata.TermOpts{
- Path: opts.Path + "/tmd",
- Logger: opts.Logger,
- }); err != nil {
- return nil, err
- }
return &store{
- lsm: lsm,
- termMetadata: md,
- l: opts.Logger,
+ lsm: lsm,
+ l: opts.Logger,
}, nil
}
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
index 43f1948..5e877fa 100644
--- a/pkg/index/lsm/search.go
+++ b/pkg/index/lsm/search.go
@@ -37,7 +37,7 @@
}
func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
- f, err := field.Marshal(s.termMetadata)
+ f, err := field.Marshal()
if err != nil {
return nil, err
}
@@ -66,7 +66,7 @@
}
func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (index.FieldIterator, error) {
- return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm, s.termMetadata,
+ return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm,
func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) {
pv := &index.PostingValue{
Term: term,
@@ -75,7 +75,7 @@
for ; delegated.Valid(); delegated.Next() {
f := index.Field{}
- err := f.Unmarshal(s.termMetadata, delegated.Key())
+ err := f.Unmarshal(delegated.Key())
if err != nil {
return nil, err
}
diff --git a/pkg/index/metadata/metadata.go b/pkg/index/metadata/metadata.go
deleted file mode 100644
index 3e150e0..0000000
--- a/pkg/index/metadata/metadata.go
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package metadata
-
-import (
- "io"
-
- "github.com/pkg/errors"
-
- "github.com/apache/skywalking-banyandb/banyand/kv"
- "github.com/apache/skywalking-banyandb/banyand/observability"
- "github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-type Term interface {
- observability.Observable
- ID(term []byte) (id []byte, err error)
- Literal(id []byte) (term []byte, err error)
- io.Closer
-}
-
-var _ Term = (*term)(nil)
-
-type term struct {
- store kv.Store
-}
-
-type TermOpts struct {
- Path string
- Logger *logger.Logger
-}
-
-func NewTerm(opts TermOpts) (Term, error) {
- var store kv.Store
- var err error
- if store, err = kv.OpenStore(0, opts.Path, kv.StoreWithNamedLogger("term_metadata", opts.Logger)); err != nil {
- return nil, err
- }
- return &term{
- store: store,
- }, nil
-}
-
-func (t *term) ID(term []byte) (id []byte, err error) {
- id = convert.Uint64ToBytes(convert.Hash(term))
- _, err = t.store.Get(id)
- if errors.Is(err, kv.ErrKeyNotFound) {
- return id, t.store.Put(id, term)
- }
- return id, nil
-}
-
-func (t *term) Literal(id []byte) (term []byte, err error) {
- return t.store.Get(id)
-}
-
-func (t *term) Close() error {
- return t.store.Close()
-}
-
-func (t *term) Stats() observability.Statistics {
- return t.store.Stats()
-}
diff --git a/pkg/index/testcases/service_name.go b/pkg/index/testcases/service_name.go
index c6e174d..df5e067 100644
--- a/pkg/index/testcases/service_name.go
+++ b/pkg/index/testcases/service_name.go
@@ -31,7 +31,7 @@
var serviceName = index.FieldKey{
// http_method
IndexRuleID: 6,
- EncodeTerm: true,
+ EncodeTerm: false,
}
func RunServiceName(t *testing.T, store SimpleStore) {
diff --git a/scripts/build/test.mk b/scripts/build/test.mk
index 8a2bdf8..ad65bdf 100644
--- a/scripts/build/test.mk
+++ b/scripts/build/test.mk
@@ -41,13 +41,13 @@
$(GINKGO) $(TEST_OPTS) $(TEST_EXTRA_OPTS) -tags "$(TEST_TAGS)" $(TEST_PKG_LIST)
.PHONY: test-race
-test-race: TEST_OPTS=-race
+test-race: TEST_OPTS=--race
test-race: test ## Run all the unit tests with race detector on
.PHONY: test-coverage
-test-coverage: ## Run all the unit tests with coverage analysis on
+test-coverage: $(GINKGO) generate ## Run all the unit tests with coverage analysis on
mkdir -p "$(TEST_COVERAGE_DIR)"
- go test $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) -coverprofile="$(TEST_COVERAGE_PROFILE)" -tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST)
+ $(GINKGO) $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) --coverprofile="$(TEST_COVERAGE_PROFILE)" --tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST)
go tool cover -html="$(TEST_COVERAGE_PROFILE)" -o "$(TEST_COVERAGE_REPORT)"
@echo "Test coverage report has been saved to $(TEST_COVERAGE_REPORT)"