Parameterize memory size and some key improvements (#153)
* Parameterize memory size
* Fix checking failure
* Extract default values as several constants
* Print strategy manager state after testing
* Add a mock clock trigger to send the "now" to "ticker.C" in each
verification round
* Update LICENSE
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
diff --git a/Makefile b/Makefile
index c888fdb..c1f0de8 100644
--- a/Makefile
+++ b/Makefile
@@ -64,7 +64,7 @@
include scripts/build/ginkgo.mk
test-ci: $(GINKGO) ## Run the unit tests in CI
- $(GINKGO) --skip-package=test/stress --race --cover --covermode atomic --coverprofile=coverage.out ./...
+ $(GINKGO) -v --skip-package=test/stress --race --cover --covermode atomic --coverprofile=coverage.out ./...
##@ Code quality targets
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 9ae630f..f40d05d 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -126,6 +126,17 @@
}
}
+func TSSWithMemTableSize(size int64) TimeSeriesOptions {
+ return func(store TimeSeriesStore) {
+ if size < 1 {
+ return
+ }
+ if btss, ok := store.(*badgerTSS); ok {
+ btss.dbOpts.MemTableSize = size
+ }
+ }
+}
+
type Iterator interface {
Next()
Rewind()
@@ -160,6 +171,9 @@
}
// Put all values into LSM
btss.dbOpts = btss.dbOpts.WithVLogPercentile(1.0)
+ if btss.dbOpts.MemTableSize < 8<<20 {
+ btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
+ }
var err error
btss.db, err = badger.Open(btss.dbOpts)
if err != nil {
@@ -187,6 +201,18 @@
}
}
+// StoreWithMemTableSize sets MemTable size
+func StoreWithMemTableSize(size int64) StoreOptions {
+ return func(store Store) {
+ if size < 1 {
+ return
+ }
+ if bdb, ok := store.(*badgerDB); ok {
+ bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
+ }
+ }
+}
+
// OpenStore creates a new Store
func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) {
bdb := new(badgerDB)
@@ -196,6 +222,9 @@
opt(bdb)
}
bdb.dbOpts = bdb.dbOpts.WithNumVersionsToKeep(math.MaxUint32)
+ if bdb.dbOpts.MemTableSize > 0 && bdb.dbOpts.MemTableSize < 8<<20 {
+ bdb.dbOpts = bdb.dbOpts.WithValueThreshold(1 << 10)
+ }
var err error
bdb.db, err = badger.Open(bdb.dbOpts)
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2fad3d5..ef235a3 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -39,7 +39,9 @@
metadata metadata.Repo
}
-func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo, l *logger.Logger) schemaRepo {
+func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo,
+ dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+) schemaRepo {
return schemaRepo{
l: l,
metadata: metadata,
@@ -47,7 +49,7 @@
metadata,
repo,
l,
- newSupplier(path, metadata, l),
+ newSupplier(path, metadata, dbOpts, l),
event.MeasureTopicShardEvent,
event.MeasureTopicEntityEvent,
),
@@ -168,13 +170,19 @@
type supplier struct {
path string
+ dbOpts tsdb.DatabaseOpts
metadata metadata.Repo
l *logger.Logger
}
-func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) *supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+ dbOpts.EncodingMethod = tsdb.EncodingMethod{
+ EncoderPool: newEncoderPool(plainChunkSize, intChunkSize, l),
+ DecoderPool: newDecoderPool(plainChunkSize, intChunkSize, l),
+ }
return &supplier{
path: path,
+ dbOpts: dbOpts,
metadata: metadata,
l: l,
}
@@ -195,17 +203,13 @@
}
func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
+ opts := s.dbOpts
+ opts.ShardNum = groupSchema.ResourceOpts.ShardNum
+ opts.Location = path.Join(s.path, groupSchema.Metadata.Name)
return tsdb.OpenDatabase(
context.WithValue(context.Background(), common.PositionKey, common.Position{
Module: "measure",
Database: groupSchema.Metadata.Name,
}),
- tsdb.DatabaseOpts{
- Location: path.Join(s.path, groupSchema.Metadata.Name),
- ShardNum: groupSchema.ResourceOpts.ShardNum,
- EncodingMethod: tsdb.EncodingMethod{
- EncoderPool: newEncoderPool(plainChunkSize, intChunkSize, s.l),
- DecoderPool: newDecoderPool(plainChunkSize, intChunkSize, s.l),
- },
- })
+ opts)
}
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 18239a2..26219d2 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -30,6 +30,7 @@
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
@@ -50,11 +51,13 @@
var _ Service = (*service)(nil)
type service struct {
+ root string
+ dbOpts tsdb.DatabaseOpts
+
schemaRepo schemaRepo
writeListener *writeCallback
l *logger.Logger
metadata metadata.Repo
- root string
pipeline queue.Queue
repo discovery.ServiceRepo
// stop channel for the service
@@ -76,6 +79,8 @@
func (s *service) FlagSet() *run.FlagSet {
flagS := run.NewFlagSet("storage")
flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of database")
+ flagS.Int64Var(&s.dbOpts.BlockMemSize, "measure-block-mem-size", 16<<20, "block memory size")
+ flagS.Int64Var(&s.dbOpts.SeriesMemSize, "measure-seriesmeta-mem-size", 1<<20, "series metadata memory size")
return flagS
}
@@ -98,7 +103,7 @@
if err != nil {
return err
}
- s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.l)
+ s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l)
for _, g := range groups {
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
continue
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 0895b16..172ed7a 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -41,7 +41,9 @@
metadata metadata.Repo
}
-func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo, l *logger.Logger) schemaRepo {
+func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo,
+ dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+) schemaRepo {
return schemaRepo{
l: l,
metadata: metadata,
@@ -49,7 +51,7 @@
metadata,
repo,
l,
- newSupplier(path, metadata, l),
+ newSupplier(path, metadata, dbOpts, l),
event.StreamTopicShardEvent,
event.StreamTopicEntityEvent,
),
@@ -170,13 +172,19 @@
type supplier struct {
path string
+ dbOpts tsdb.DatabaseOpts
metadata metadata.Repo
l *logger.Logger
}
-func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) *supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+ dbOpts.EncodingMethod = tsdb.EncodingMethod{
+ EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
+ DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
+ }
return &supplier{
path: path,
+ dbOpts: dbOpts,
metadata: metadata,
l: l,
}
@@ -197,17 +205,13 @@
}
func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
+ opts := s.dbOpts
+ opts.ShardNum = groupSchema.ResourceOpts.ShardNum
+ opts.Location = path.Join(s.path, groupSchema.Metadata.Name)
return tsdb.OpenDatabase(
context.WithValue(context.Background(), common.PositionKey, common.Position{
Module: "stream",
Database: groupSchema.Metadata.Name,
}),
- tsdb.DatabaseOpts{
- Location: path.Join(s.path, groupSchema.Metadata.Name),
- ShardNum: groupSchema.ResourceOpts.ShardNum,
- EncodingMethod: tsdb.EncodingMethod{
- EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
- DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
- },
- })
+ opts)
}
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index 1109ad4..2a7f555 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -30,6 +30,7 @@
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -49,11 +50,13 @@
var _ Service = (*service)(nil)
type service struct {
+ root string
+ dbOpts tsdb.DatabaseOpts
+
schemaRepo schemaRepo
writeListener *writeCallback
l *logger.Logger
metadata metadata.Repo
- root string
pipeline queue.Queue
repo discovery.ServiceRepo
// stop channel for the service
@@ -71,6 +74,9 @@
func (s *service) FlagSet() *run.FlagSet {
flagS := run.NewFlagSet("storage")
flagS.StringVar(&s.root, "stream-root-path", "/tmp", "the root path of database")
+ flagS.Int64Var(&s.dbOpts.BlockMemSize, "stream-block-mem-size", 8<<20, "block memory size")
+ flagS.Int64Var(&s.dbOpts.SeriesMemSize, "stream-seriesmeta-mem-size", 1<<20, "series metadata memory size")
+ flagS.Int64Var(&s.dbOpts.GlobalIndexMemSize, "stream-global-index-mem-size", 2<<20, "global index memory size")
return flagS
}
@@ -93,7 +99,7 @@
if err != nil {
return err
}
- s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.l)
+ s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l)
for _, g := range groups {
if g.Catalog != commonv1.Catalog_CATALOG_STREAM {
continue
@@ -149,5 +155,8 @@
metadata: metadata,
repo: repo,
pipeline: pipeline,
+ dbOpts: tsdb.DatabaseOpts{
+ EnableGlobalIndex: true,
+ },
}, nil
}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 83854ac..1af3871 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -93,10 +93,11 @@
sm.db = db
sm.indexWriter = index.NewWriter(ctx, index.WriterOptions{
- DB: db,
- ShardNum: shardNum,
- Families: spec.schema.TagFamilies,
- IndexRules: spec.indexRules,
+ DB: db,
+ ShardNum: shardNum,
+ Families: spec.schema.TagFamilies,
+ IndexRules: spec.indexRules,
+ EnableGlobalIndex: true,
})
return sm, nil
}
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 3419a34..18393c0 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -19,15 +19,15 @@
import (
"context"
+ "fmt"
"io"
"path"
+ "runtime"
"strconv"
"sync"
+ "sync/atomic"
"time"
- "github.com/dgraph-io/ristretto/z"
- "go.uber.org/atomic"
-
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -37,7 +37,6 @@
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/lsm"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -45,16 +44,21 @@
componentMain = "main"
componentSecondInvertedIdx = "inverted"
componentSecondLSMIdx = "lsm"
+
+ defaultMainMemorySize = 8 << 20
)
type block struct {
- path string
- l *logger.Logger
- suffix string
- ref *z.Closer
- lock sync.RWMutex
- closed *atomic.Bool
- position common.Position
+ path string
+ l *logger.Logger
+ queue bucket.Queue
+ suffix string
+ ref *atomic.Int32
+ closed *atomic.Bool
+ lock sync.RWMutex
+ position common.Position
+ memSize int64
+ lsmMemSize int64
store kv.TimeSeriesStore
invertedIndex index.Store
@@ -65,8 +69,7 @@
segID uint16
blockID uint16
encodingMethod EncodingMethod
- flushCh *run.Chan[struct{}]
- flushChQueue chan *run.Chan[struct{}]
+ flushCh chan struct{}
}
type blockOpts struct {
@@ -75,6 +78,7 @@
startTime time.Time
suffix string
path string
+ queue bucket.Queue
}
func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
@@ -84,60 +88,65 @@
}
id := GenerateInternalID(opts.blockSize.Unit, suffixInteger)
timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false)
- encodingMethodObject := ctx.Value(encodingMethodKey)
- if encodingMethodObject == nil {
- encodingMethodObject = EncodingMethod{
- EncoderPool: encoding.NewPlainEncoderPool(0),
- DecoderPool: encoding.NewPlainDecoderPool(0),
- }
- }
clock, _ := timestamp.GetClock(ctx)
b = &block{
- segID: opts.segID,
- blockID: id,
- path: opts.path,
- l: logger.Fetch(ctx, "block"),
- TimeRange: timeRange,
- Reporter: bucket.NewTimeBasedReporter(timeRange, clock),
- closed: atomic.NewBool(true),
- encodingMethod: encodingMethodObject.(EncodingMethod),
- flushChQueue: make(chan *run.Chan[struct{}]),
+ segID: opts.segID,
+ blockID: id,
+ path: opts.path,
+ l: logger.Fetch(ctx, "block"),
+ TimeRange: timeRange,
+ Reporter: bucket.NewTimeBasedReporter(timeRange, clock),
+ flushCh: make(chan struct{}),
+ ref: &atomic.Int32{},
+ closed: &atomic.Bool{},
+ queue: opts.queue,
}
+ b.options(ctx)
position := ctx.Value(common.PositionKey)
if position != nil {
b.position = position.(common.Position)
}
go func() {
- for {
- ch := <-b.flushChQueue
- for {
- _, more := ch.Read()
- if !more {
- break
- }
- b.flush()
- }
+ for range b.flushCh {
+ b.flush()
}
}()
- return b, err
+ return b, b.open()
+}
+
+func (b *block) options(ctx context.Context) {
+ var options DatabaseOpts
+ o := ctx.Value(optionsKey)
+ if o != nil {
+ options = o.(DatabaseOpts)
+ }
+ if options.EncodingMethod.EncoderPool == nil {
+ options.EncodingMethod.EncoderPool = encoding.NewPlainEncoderPool(0)
+ }
+ if options.EncodingMethod.EncoderPool == nil {
+ options.EncodingMethod.DecoderPool = encoding.NewPlainDecoderPool(0)
+ }
+ b.encodingMethod = options.EncodingMethod
+ if options.BlockMemSize < 1 {
+ b.memSize = defaultMainMemorySize
+ } else {
+ b.memSize = options.BlockMemSize
+ }
+ b.lsmMemSize = b.memSize / 8
+ if b.lsmMemSize < defaultKVMemorySize {
+ b.lsmMemSize = defaultKVMemorySize
+ }
}
func (b *block) open() (err error) {
- b.lock.Lock()
- defer b.lock.Unlock()
- if !b.closed.Load() {
- return nil
- }
- b.ref = z.NewCloser(1)
- b.flushCh = run.NewChan(make(chan struct{}))
- b.flushChQueue <- b.flushCh
if b.store, err = kv.OpenTimeSeriesStore(
0,
path.Join(b.path, componentMain),
kv.TSSWithEncoding(b.encodingMethod.EncoderPool, b.encodingMethod.DecoderPool),
kv.TSSWithLogger(b.l.Named(componentMain)),
+ kv.TSSWithMemTableSize(b.memSize),
kv.TSSWithFlushCallback(func() {
- b.flushCh.Write(struct{}{})
+ b.flushCh <- struct{}{}
}),
); err != nil {
return err
@@ -150,33 +159,76 @@
return err
}
if b.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{
- Path: path.Join(b.path, componentSecondLSMIdx),
- Logger: b.l.Named(componentSecondLSMIdx),
+ Path: path.Join(b.path, componentSecondLSMIdx),
+ Logger: b.l.Named(componentSecondLSMIdx),
+ MemTableSize: b.lsmMemSize,
}); err != nil {
return err
}
b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
+ b.ref.Store(0)
b.closed.Store(false)
-
return nil
}
-func (b *block) delegate() blockDelegate {
- if b.isClosed() {
- return nil
+func (b *block) delegate() (blockDelegate, error) {
+ if b.incRef() {
+ return &bDelegate{
+ delegate: b,
+ }, nil
+ }
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ b.queue.Push(BlockID{
+ BlockID: b.blockID,
+ SegID: b.segID,
+ })
+ // TODO: remove the block which fails to open from the queue
+ err := b.open()
+ if err != nil {
+ b.l.Error().Err(err).Stringer("block", b).Msg("fail to open block")
+ return nil, err
}
b.incRef()
return &bDelegate{
delegate: b,
+ }, nil
+}
+
+func (b *block) incRef() bool {
+loop:
+ if b.Closed() {
+ return false
}
+ r := b.ref.Load()
+ if b.ref.CompareAndSwap(r, r+1) {
+ return true
+ }
+ runtime.Gosched()
+ goto loop
}
-func (b *block) dscRef() {
- b.ref.Done()
+func (b *block) Done() {
+loop:
+ r := b.ref.Load()
+ if r < 1 {
+ return
+ }
+ if b.ref.CompareAndSwap(r, r-1) {
+ return
+ }
+ runtime.Gosched()
+ goto loop
}
-func (b *block) incRef() {
- b.ref.AddRunning(1)
+func (b *block) waitDone() {
+loop:
+ if b.ref.Load() < 1 {
+ b.ref.Store(0)
+ return
+ }
+ runtime.Gosched()
+ goto loop
}
func (b *block) flush() {
@@ -193,28 +245,27 @@
func (b *block) close() {
b.lock.Lock()
defer b.lock.Unlock()
- if b.isClosed() {
- return
- }
- b.dscRef()
- b.ref.SignalAndWait()
+ b.closed.Store(true)
+ b.waitDone()
for _, closer := range b.closableLst {
_ = closer.Close()
}
- b.closed.Store(true)
- b.flushCh.Close()
}
-func (b *block) isClosed() bool {
+func (b *block) Closed() bool {
return b.closed.Load()
}
func (b *block) String() string {
- return b.Reporter.String()
+ return fmt.Sprintf("BlockID-%d-%d", b.segID, b.blockID)
}
func (b *block) stats() (names []string, stats []observability.Statistics) {
names = append(names, componentMain, componentSecondInvertedIdx, componentSecondLSMIdx)
+ if b.Closed() {
+ stats = make([]observability.Statistics, 3)
+ return
+ }
stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats())
return names, stats
}
@@ -290,6 +341,6 @@
}
func (d *bDelegate) Close() error {
- d.delegate.dscRef()
+ d.delegate.Done()
return nil
}
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 3cd6531..cf928d6 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -18,7 +18,9 @@
package bucket
import (
- "sync"
+ "fmt"
+ "math"
+ "sync/atomic"
"github.com/pkg/errors"
"go.uber.org/multierr"
@@ -34,14 +36,12 @@
type Ratio float64
type Strategy struct {
- optionsErr error
- ratio Ratio
- ctrl Controller
- current Reporter
- next Reporter
- mux sync.Mutex
- logger *logger.Logger
- stopCh chan struct{}
+ optionsErr error
+ ratio Ratio
+ ctrl Controller
+ current atomic.Value
+ currentRatio uint64
+ logger *logger.Logger
}
type StrategyOptions func(*Strategy)
@@ -68,9 +68,8 @@
return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
}
strategy := &Strategy{
- ctrl: ctrl,
- ratio: 0.8,
- stopCh: make(chan struct{}),
+ ctrl: ctrl,
+ ratio: 0.8,
}
for _, opt := range options {
opt(strategy)
@@ -85,52 +84,50 @@
}
func (s *Strategy) Run() {
- reset := func() {
- for s.current == nil {
- s.current = s.ctrl.Current()
- }
- s.next = nil
+ for s.current.Load() == nil {
+ s.current.Store(s.ctrl.Current())
}
- reset()
go func(s *Strategy) {
- var err error
- bucket:
- c := s.current.Report()
for {
- select {
- case status, closed := <-c:
- if !closed {
- reset()
- goto bucket
- }
- ratio := Ratio(status.Volume) / Ratio(status.Capacity)
- if ratio >= s.ratio && s.next == nil {
- s.next, err = s.ctrl.Next()
- if errors.Is(err, ErrNoMoreBucket) {
- return
- }
- if err != nil {
- s.logger.Err(err).Msg("failed to create the next bucket")
- }
- }
- if ratio >= 1.0 {
- s.mux.Lock()
- s.ctrl.OnMove(s.current, s.next)
- s.current = s.next
- s.next = nil
- s.mux.Unlock()
- goto bucket
- }
- case <-s.stopCh:
- return
- }
+ c := s.current.Load().(Reporter).Report()
+ s.observe(c)
}
}(s)
}
+func (s *Strategy) String() string {
+ c := s.current.Load()
+ if c == nil {
+ return "nil"
+ }
+ return fmt.Sprintf("%s:%f", c.(Reporter).String(),
+ math.Float64frombits(atomic.LoadUint64(&s.currentRatio)))
+}
+
+func (s *Strategy) observe(c Channel) {
+ var err error
+ var next Reporter
+ moreBucket := true
+ for status := range c {
+ ratio := Ratio(status.Volume) / Ratio(status.Capacity)
+ atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(ratio)))
+ if ratio >= s.ratio && next == nil && moreBucket {
+ next, err = s.ctrl.Next()
+ if errors.Is(err, ErrNoMoreBucket) {
+ moreBucket = false
+ } else if err != nil {
+ s.logger.Err(err).Msg("failed to create the next bucket")
+ }
+ }
+ if ratio >= 1.0 {
+ s.ctrl.OnMove(s.current.Load().(Reporter), next)
+ if next != nil {
+ s.current.Store(next)
+ }
+ return
+ }
+ }
+}
+
func (s *Strategy) Close() {
- close(s.stopCh)
- s.mux.Lock()
- defer s.mux.Unlock()
- s.ctrl.OnMove(s.current, nil)
}
diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go
index 510707c..b73c044 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -123,18 +123,18 @@
type reporter struct {
capacity int
- volume int
step int
}
func (r *reporter) Report() bucket.Channel {
ch := make(bucket.Channel)
go func() {
+ var volume int
for i := 0; i < r.capacity; i++ {
- r.volume += r.step
+ volume += r.step
ch <- bucket.Status{
Capacity: r.capacity,
- Volume: r.volume,
+ Volume: volume,
}
}
close(ch)
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 3f94c25..8b0dcc9 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -33,7 +33,6 @@
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
- "github.com/apache/skywalking-banyandb/pkg/run"
)
type CallbackFn func()
@@ -52,18 +51,20 @@
}
type WriterOptions struct {
- ShardNum uint32
- Families []*databasev1.TagFamilySpec
- IndexRules []*databasev1.IndexRule
- DB tsdb.Supplier
+ ShardNum uint32
+ Families []*databasev1.TagFamilySpec
+ IndexRules []*databasev1.IndexRule
+ DB tsdb.Supplier
+ EnableGlobalIndex bool
}
type Writer struct {
- l *logger.Logger
- db tsdb.Supplier
- shardNum uint32
- ch *run.Chan[Message]
- indexRuleIndex []*partition.IndexRuleLocator
+ l *logger.Logger
+ db tsdb.Supplier
+ shardNum uint32
+ enableGlobalIndex bool
+ ch chan Message
+ indexRuleIndex []*partition.IndexRuleLocator
}
func NewWriter(ctx context.Context, options WriterOptions) *Writer {
@@ -76,29 +77,26 @@
}
w.shardNum = options.ShardNum
w.db = options.DB
+ w.enableGlobalIndex = options.EnableGlobalIndex
w.indexRuleIndex = partition.ParseIndexRuleLocators(options.Families, options.IndexRules)
- w.ch = run.NewChan[Message](make(chan Message))
+ w.ch = make(chan Message)
w.bootIndexGenerator()
return w
}
func (s *Writer) Write(value Message) {
go func(m Message) {
- s.ch.Write(m)
+ s.ch <- m
}(value)
}
func (s *Writer) Close() error {
- return s.ch.Close()
+ return nil
}
func (s *Writer) bootIndexGenerator() {
go func() {
- for {
- m, more := s.ch.Read()
- if !more {
- return
- }
+ for m := range s.ch {
var err error
for _, ruleIndex := range s.indexRuleIndex {
rule := ruleIndex.Rule
@@ -106,6 +104,10 @@
case databasev1.IndexRule_LOCATION_SERIES:
err = multierr.Append(err, writeLocalIndex(m.LocalWriter, ruleIndex, m.Value))
case databasev1.IndexRule_LOCATION_GLOBAL:
+ if !s.enableGlobalIndex {
+ s.l.Warn().Stringer("index-rule", ruleIndex.Rule).Msg("global index is disabled")
+ continue
+ }
err = multierr.Append(err, s.writeGlobalIndex(m.Scope, ruleIndex, m.LocalWriter.ItemID(), m.Value))
}
}
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index a45f57d..a97cddf 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -81,7 +81,7 @@
segStats.MaxMemBytes += segStat.MaxMemBytes
segStats.MemBytes += segStat.MemBytes
for _, b := range seg.blockController.blocks() {
- if b.closed.Load() {
+ if b.Closed() {
continue
}
names, bss := b.stats()
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index a758947..453548e 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -20,6 +20,7 @@
import (
"context"
"errors"
+ "fmt"
"strconv"
"sync"
"time"
@@ -77,9 +78,25 @@
if err != nil {
return nil, err
}
- if s.globalIndex, err = kv.OpenStore(0, indexPath, kv.StoreWithLogger(s.l)); err != nil {
- return nil, err
+ o := ctx.Value(optionsKey)
+ if o != nil {
+ options := o.(DatabaseOpts)
+ if options.EnableGlobalIndex {
+ memSize := options.GlobalIndexMemSize
+ if memSize == 0 {
+ memSize = defaultKVMemorySize
+ }
+ if s.globalIndex, err = kv.OpenStore(
+ 0,
+ indexPath,
+ kv.StoreWithLogger(s.l),
+ kv.StoreWithMemTableSize(memSize),
+ ); err != nil {
+ return nil, err
+ }
+ }
}
+
s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l))
if err != nil {
return nil, err
@@ -90,7 +107,9 @@
func (s *segment) close() {
s.blockController.close()
- s.globalIndex.Close()
+ if s.globalIndex != nil {
+ s.globalIndex.Close()
+ }
s.Stop()
}
@@ -99,10 +118,13 @@
}
func (s segment) String() string {
- return s.Reporter.String()
+ return fmt.Sprintf("SegID-%d", s.id)
}
func (s *segment) Stats() observability.Statistics {
+ if s.globalIndex == nil {
+ return observability.Statistics{}
+ }
return s.globalIndex.Stats()
}
@@ -159,7 +181,6 @@
if errors.Is(err, ErrEndOfSegment) {
return nil, bucket.ErrNoMoreBucket
}
- err = reporter.open()
if err != nil {
return nil, err
}
@@ -201,18 +222,30 @@
panic("invalid interval unit")
}
-func (bc *blockController) span(timeRange timestamp.TimeRange) (bb []*block) {
- return bc.ensureBlockOpen(bc.search(func(b *block) bool {
+func (bc *blockController) span(timeRange timestamp.TimeRange) ([]blockDelegate, error) {
+ bb := bc.search(func(b *block) bool {
return b.Overlapping(timeRange)
- }))
+ })
+ if bb == nil {
+ return nil, nil
+ }
+ dd := make([]blockDelegate, len(bb))
+ for i, b := range bb {
+ d, err := b.delegate()
+ if err != nil {
+ return nil, err
+ }
+ dd[i] = d
+ }
+ return dd, nil
}
-func (bc *blockController) get(blockID uint16) *block {
+func (bc *blockController) get(blockID uint16) (blockDelegate, error) {
b := bc.getBlock(blockID)
if b != nil {
- return bc.ensureBlockOpen([]*block{b})[0]
+ return b.delegate()
}
- return nil
+ return nil, nil
}
func (bc *blockController) getBlock(blockID uint16) *block {
@@ -250,26 +283,6 @@
return bb
}
-func (bc *blockController) ensureBlockOpen(blocks []*block) (openedBlocks []*block) {
- if blocks == nil {
- return nil
- }
- for _, b := range blocks {
- if b.isClosed() {
- if err := b.open(); err != nil {
- bc.l.Error().Err(err).Stringer("block", b).Msg("fail to open block")
- continue
- }
- }
- openedBlocks = append(openedBlocks, b)
- bc.blockQueue.Push(BlockID{
- BlockID: b.blockID,
- SegID: b.segID,
- })
- }
- return openedBlocks
-}
-
func (bc *blockController) closeBlock(blockID uint16) {
bc.RLock()
defer bc.RUnlock()
@@ -309,13 +322,10 @@
return err
}
if bc.Current() == nil {
- b, err := bc.create(bc.clock.Now())
+ _, err := bc.create(bc.clock.Now())
if err != nil {
return err
}
- if err = b.open(); err != nil {
- return err
- }
}
return nil
}
@@ -351,6 +361,7 @@
startTime: starTime,
suffix: suffix,
blockSize: bc.blockSize,
+ queue: bc.blockQueue,
}); err != nil {
return nil, err
}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index bed9555..1696616 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -95,7 +95,10 @@
}
func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
- b := s.blockDB.block(id)
+ b, err := s.blockDB.block(id)
+ if err != nil {
+ return nil, nil, err
+ }
if b == nil {
return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id)
}
@@ -111,7 +114,10 @@
}
func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
- blocks := s.blockDB.span(timeRange)
+ blocks, err := s.blockDB.span(timeRange)
+ if err != nil {
+ return nil, err
+ }
if len(blocks) < 1 {
return nil, ErrEmptySeriesSpan
}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index e2a6ef8..9ad7cbe 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -132,8 +132,8 @@
type blockDatabase interface {
shardID() common.ShardID
- span(timeRange timestamp.TimeRange) []blockDelegate
- block(id GlobalItemID) blockDelegate
+ span(timeRange timestamp.TimeRange) ([]blockDelegate, error)
+ block(id GlobalItemID) (blockDelegate, error)
}
var (
@@ -172,16 +172,12 @@
return newSeries(s.context(), id, s), nil
}
-func (s *seriesDB) block(id GlobalItemID) blockDelegate {
+func (s *seriesDB) block(id GlobalItemID) (blockDelegate, error) {
seg := s.segCtrl.get(id.segID)
if seg == nil {
- return nil
+ return nil, nil
}
- block := seg.blockController.get(id.blockID)
- if block == nil {
- return nil
- }
- return block.delegate()
+ return seg.blockController.get(id.blockID)
}
func (s *seriesDB) shardID() common.ShardID {
@@ -238,18 +234,20 @@
return result, err
}
-func (s *seriesDB) span(timeRange timestamp.TimeRange) []blockDelegate {
+func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]blockDelegate, error) {
// TODO: return correct blocks
result := make([]blockDelegate, 0)
for _, s := range s.segCtrl.span(timeRange) {
- for _, b := range s.blockController.span(timeRange) {
- bd := b.delegate()
- if bd != nil {
- result = append(result, bd)
- }
+ dd, err := s.blockController.span(timeRange)
+ if err != nil {
+ return nil, err
}
+ if dd == nil {
+ continue
+ }
+ result = append(result, dd...)
}
- return result
+ return result, nil
}
func (s *seriesDB) context() context.Context {
@@ -270,8 +268,23 @@
segCtrl: segCtrl,
l: logger.Fetch(ctx, "series_database"),
}
+ o := ctx.Value(optionsKey)
+ var memSize int64
+ if o != nil {
+ options := o.(DatabaseOpts)
+ if options.SeriesMemSize > 1 {
+ memSize = options.SeriesMemSize
+ } else {
+ memSize = defaultKVMemorySize
+ }
+ } else {
+ memSize = defaultKVMemorySize
+ }
var err error
- sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
+ sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md",
+ kv.StoreWithNamedLogger("metadata", sdb.l),
+ kv.StoreWithMemTableSize(memSize),
+ )
if err != nil {
return nil, err
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index a318a80..831dde3 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -19,6 +19,7 @@
import (
"context"
+ "sort"
"strconv"
"sync"
"time"
@@ -31,7 +32,10 @@
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
-const defaultBlockQueueSize = 1 << 4
+const (
+ defaultBlockQueueSize = 1 << 4
+ defaultKVMemorySize = 1 << 20
+)
var _ Shard = (*shard)(nil)
@@ -118,7 +122,9 @@
}
func (s *shard) State() (shardState ShardState) {
+ shardState.StrategyManagers = append(shardState.StrategyManagers, s.segmentManageStrategy.String())
for _, seg := range s.segmentController.segments() {
+ shardState.StrategyManagers = append(shardState.StrategyManagers, seg.blockManageStrategy.String())
for _, b := range seg.blockController.blocks() {
shardState.Blocks = append(shardState.Blocks, BlockState{
ID: BlockID{
@@ -126,7 +132,7 @@
BlockID: b.blockID,
},
TimeRange: b.TimeRange,
- Closed: b.isClosed(),
+ Closed: b.Closed(),
})
}
}
@@ -135,6 +141,15 @@
for i, v := range s.segmentController.blockQueue.All() {
shardState.OpenBlocks[i] = v.(BlockID)
}
+ sort.Slice(shardState.OpenBlocks, func(i, j int) bool {
+ x := shardState.OpenBlocks[i]
+ y := shardState.OpenBlocks[j]
+ if x.SegID == y.SegID {
+ return x.BlockID < y.BlockID
+ }
+ return x.SegID < y.SegID
+ })
+ s.l.Info().Interface("", shardState).Msg("state")
return shardState
}
@@ -295,7 +310,6 @@
event := sc.l.Info()
if prev != nil {
event.Stringer("prev", prev)
- prev.(*segment).blockManageStrategy.Close()
}
if next != nil {
event.Stringer("next", next)
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 7ef3839..6571d53 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -45,6 +45,7 @@
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
+ GinkgoWriter.Printf("shard state:%+v \n", shard.State())
shard.Close()
deferFn()
})
@@ -67,7 +68,7 @@
t1 := clock.Now()
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -80,8 +81,11 @@
clock.Add(10 * time.Hour)
t2 := clock.Now().Add(2 * time.Hour)
Eventually(func() []tsdb.BlockState {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/01 10:00 has been triggered")
+ }
return shard.State().Blocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -99,12 +103,15 @@
}))
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{}))
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
By("01/01 13:00 moves to the 2nd block")
clock.Add(3 * time.Hour)
Eventually(func() []tsdb.BlockID {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/01 13:00 has been triggered")
+ }
return shard.State().OpenBlocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -114,8 +121,11 @@
clock.Add(9 * time.Hour)
t3 := clock.Now().Add(2 * time.Hour)
Eventually(func() []tsdb.BlockState {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/01 22:00 has been triggered")
+ }
return shard.State().Blocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -141,8 +151,11 @@
By("01/02 01:00 moves to 3rd block")
clock.Add(3 * time.Hour)
Eventually(func() []tsdb.BlockID {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/02 01:00 has been triggered")
+ }
return shard.State().OpenBlocks
- }).Should(Equal([]tsdb.BlockID{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -156,8 +169,11 @@
clock.Add(9 * time.Hour)
t4 := clock.Now().Add(2 * time.Hour)
Eventually(func() []tsdb.BlockState {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/02 10:00 has been triggered")
+ }
return shard.State().Blocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -190,8 +206,15 @@
By("01/02 13:00 moves to 4th block")
clock.Add(3 * time.Hour)
Eventually(func() []tsdb.BlockID {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/02 13:00 has been triggered")
+ }
return shard.State().OpenBlocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+ },
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -200,17 +223,16 @@
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
},
- {
- SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
- BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
- },
}))
By("01/02 22:00 5th block is opened")
clock.Add(9 * time.Hour)
t5 := clock.Now().Add(2 * time.Hour)
Eventually(func() []tsdb.BlockState {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/02 22:00 has been triggered")
+ }
return shard.State().Blocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -250,8 +272,15 @@
By("01/03 01:00 close 1st block by adding 5th block")
clock.Add(3 * time.Hour)
Eventually(func() []tsdb.BlockID {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/03 01:00 has been triggered")
+ }
return shard.State().OpenBlocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -260,14 +289,10 @@
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
},
- {
- SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
- BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
- },
}))
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -318,23 +343,23 @@
Expect(err).NotTo(HaveOccurred())
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
- {
- SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
- BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
- },
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
},
{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ {
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
- BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
},
}))
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -348,7 +373,6 @@
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
},
TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
- Closed: true,
},
{
ID: tsdb.BlockID{
@@ -356,6 +380,7 @@
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
},
TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+ Closed: true,
},
{
ID: tsdb.BlockID{
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 20d8e6e..2f77874 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -59,10 +59,10 @@
ErrInvalidShardID = errors.New("invalid shard id")
ErrOpenDatabase = errors.New("fails to open the database")
- encodingMethodKey = contextEncodingMethodKey{}
+ optionsKey = contextOptionsKey{}
)
-type contextEncodingMethodKey struct{}
+type contextOptionsKey struct{}
type Supplier interface {
SupplyTSDB() Database
@@ -84,11 +84,15 @@
var _ Database = (*database)(nil)
type DatabaseOpts struct {
- Location string
- ShardNum uint32
- EncodingMethod EncodingMethod
- SegmentSize IntervalRule
- BlockSize IntervalRule
+ Location string
+ ShardNum uint32
+ EncodingMethod EncodingMethod
+ SegmentSize IntervalRule
+ BlockSize IntervalRule
+ BlockMemSize int64
+ SeriesMemSize int64
+ EnableGlobalIndex bool
+ GlobalIndexMemSize int64
}
type EncodingMethod struct {
@@ -111,8 +115,9 @@
Closed bool
}
type ShardState struct {
- Blocks []BlockState
- OpenBlocks []BlockID
+ Blocks []BlockState
+ OpenBlocks []BlockID
+ StrategyManagers []string
}
type database struct {
@@ -186,7 +191,7 @@
return nil, errors.Wrap(err, "failed to read directory contents failed")
}
thisContext := context.WithValue(ctx, logger.ContextKey, db.logger)
- thisContext = context.WithValue(thisContext, encodingMethodKey, opts.EncodingMethod)
+ thisContext = context.WithValue(thisContext, optionsKey, opts)
if len(entries) > 0 {
return loadDatabase(thisContext, db)
}
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index 97c11f2..cd11765 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -26,7 +26,7 @@
"github.com/apache/skywalking-banyandb/pkg/logger"
)
-var defaultEventallyTimeout = 30 * time.Second
+var defaultEventuallyTimeout = 30 * time.Second
func TestTsdb(t *testing.T) {
RegisterFailHandler(Fail)
diff --git a/dist/LICENSE b/dist/LICENSE
index a8afea8..08d2763 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -256,7 +256,7 @@
github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3 BSD-3-Clause
github.com/pmezard/go-difflib v1.0.0 BSD-3-Clause
github.com/spf13/pflag v1.0.5 BSD-3-Clause
- golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e BSD-3-Clause
+ golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 BSD-3-Clause
golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f BSD-3-Clause
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 BSD-3-Clause
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 BSD-3-Clause
@@ -282,7 +282,7 @@
MIT licenses
========================================================================
- github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8 MIT
+ github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 MIT
github.com/beorn7/perks v1.0.1 MIT
github.com/cespare/xxhash v1.1.0 MIT
github.com/cespare/xxhash/v2 v2.1.2 MIT
@@ -305,7 +305,7 @@
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 MIT
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 MIT
go.etcd.io/bbolt v1.3.6 MIT
- go.uber.org/atomic v1.9.0 MIT
+ go.uber.org/atomic v1.7.0 MIT
go.uber.org/multierr v1.8.0 MIT
go.uber.org/zap v1.17.0 MIT
gopkg.in/natefinch/lumberjack.v2 v2.0.0 MIT
diff --git a/go.mod b/go.mod
index 3b5b9d7..1cecc88 100644
--- a/go.mod
+++ b/go.mod
@@ -8,15 +8,20 @@
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/badger/v3 v3.2011.1
github.com/dgraph-io/ristretto v0.1.0
+ github.com/envoyproxy/protoc-gen-validate v0.1.0
+ github.com/go-chi/chi/v5 v5.0.7
github.com/golang/mock v1.6.0
- github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8
github.com/google/uuid v1.3.0
+ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
+ github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3
+ github.com/hashicorp/golang-lru v0.5.4
github.com/klauspost/compress v1.15.6
github.com/oklog/run v1.1.0
github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0
github.com/pkg/errors v0.9.1
+ github.com/prometheus/client_golang v1.12.2
github.com/rs/zerolog v1.26.1
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
@@ -25,25 +30,14 @@
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.uber.org/multierr v1.8.0
- golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
+ golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f
+ golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3
google.golang.org/genproto v0.0.0-20220615141314-f1464d18c36b
google.golang.org/grpc v1.47.0
google.golang.org/protobuf v1.28.0
)
require (
- github.com/envoyproxy/protoc-gen-validate v0.1.0
- github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3
- github.com/hashicorp/golang-lru v0.5.4
- github.com/prometheus/client_golang v1.12.2
- go.uber.org/atomic v1.9.0
- golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f
- golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3
-)
-
-require golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
-
-require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
@@ -53,15 +47,14 @@
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
- github.com/go-chi/chi/v5 v5.0.7
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
+ github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
- github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
@@ -106,11 +99,14 @@
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
+ go.uber.org/atomic v1.7.0 // indirect
go.uber.org/zap v1.17.0 // indirect
- golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
+ golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
+ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
+ golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
@@ -119,6 +115,6 @@
)
replace (
- github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8
+ github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97
github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4
)
diff --git a/go.sum b/go.sum
index d37cd1d..b1b19e4 100644
--- a/go.sum
+++ b/go.sum
@@ -50,8 +50,8 @@
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4 h1:iLwRXI6WHBMb2VkWrlYKIFngPKwgs2OnjliXdMB5DY0=
github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4/go.mod h1:Q0luV7nB94o3Bl4hYqAPy03+QTtLxs9pWdUEQb0i0K0=
-github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8 h1:TK3KN7H7ROhT0/sfY8JWWa5xHOo5jUqW7Stc/VxNJyA=
-github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
+github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
+github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -512,9 +512,8 @@
go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
-go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
-go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
@@ -536,8 +535,8 @@
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
-golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=
+golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -577,8 +576,6 @@
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
-golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
-golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 192a94f..b3eb099 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -46,15 +46,8 @@
}
func (f *FieldKey) Unmarshal(raw []byte) error {
- switch len(raw) {
- case 12:
- f.SeriesID = common.SeriesID(convert.BytesToUint64(raw[0:8]))
- f.IndexRuleID = convert.BytesToUint32(raw[8:])
- case 4:
- f.IndexRuleID = convert.BytesToUint32(raw)
- default:
- return errors.Wrap(ErrMalformed, "unmarshal field key")
- }
+ f.SeriesID = common.SeriesID(convert.BytesToUint64(raw[0:8]))
+ f.IndexRuleID = convert.BytesToUint32(raw[8:])
return nil
}
@@ -72,14 +65,17 @@
}
func (f *Field) Unmarshal(raw []byte) error {
+ if len(raw) < 13 {
+ return errors.WithMessagef(ErrMalformed, "malformed field: expected more than 12, got %d", len(raw))
+ }
fk := &f.Key
- err := fk.Unmarshal(raw[:len(raw)-8])
+ err := fk.Unmarshal(raw[:12])
if err != nil {
return errors.Wrap(err, "unmarshal a field")
}
- termID := raw[len(raw)-8:]
- f.Term = make([]byte, len(termID))
- copy(f.Term, termID)
+ term := raw[12:]
+ f.Term = make([]byte, len(term))
+ copy(f.Term, term)
return nil
}
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 53de61b..e903098 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -55,14 +55,19 @@
}
type StoreOpts struct {
- Path string
- Logger *logger.Logger
+ Path string
+ Logger *logger.Logger
+ MemTableSize int64
}
func NewStore(opts StoreOpts) (index.Store, error) {
var err error
var lsm kv.Store
- if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", kv.StoreWithLogger(opts.Logger)); err != nil {
+ if lsm, err = kv.OpenStore(
+ 0,
+ opts.Path+"/lsm",
+ kv.StoreWithLogger(opts.Logger),
+ kv.StoreWithMemTableSize(opts.MemTableSize)); err != nil {
return nil, err
}
return &store{
diff --git a/pkg/run/channel.go b/pkg/run/channel.go
deleted file mode 100644
index 82a9299..0000000
--- a/pkg/run/channel.go
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package run
-
-import (
- "sync"
-)
-
-type Chan[T any] struct {
- ch chan T
- closer sync.WaitGroup
-}
-
-func NewChan[T any](ch chan T) *Chan[T] {
- return &Chan[T]{
- ch: ch,
- }
-}
-
-func (c *Chan[T]) Write(item T) {
- c.closer.Add(1)
- defer c.closer.Done()
- c.ch <- item
-}
-
-func (c *Chan[T]) Read() (T, bool) {
- item, more := <-c.ch
- return item, more
-}
-
-func (c *Chan[T]) Close() error {
- c.closer.Wait()
- close(c.ch)
- return nil
-}
diff --git a/pkg/test/helpers/fail_interceptor.go b/pkg/test/helpers/fail_interceptor.go
index a442d27..b971088 100644
--- a/pkg/test/helpers/fail_interceptor.go
+++ b/pkg/test/helpers/fail_interceptor.go
@@ -17,8 +17,9 @@
package helpers
import (
+ "sync/atomic"
+
"github.com/onsi/gomega/types"
- "go.uber.org/atomic"
)
type FailInterceptor struct {
@@ -29,7 +30,7 @@
func NewFailInterceptor(fail types.GomegaFailHandler) *FailInterceptor {
return &FailInterceptor{
ginkgoFail: fail,
- didFail: atomic.NewBool(false),
+ didFail: &atomic.Bool{},
}
}
diff --git a/pkg/timestamp/clock.go b/pkg/timestamp/clock.go
index 92b845a..322747a 100644
--- a/pkg/timestamp/clock.go
+++ b/pkg/timestamp/clock.go
@@ -34,6 +34,8 @@
Add(d time.Duration)
// Set sets the current time of the mock clock to a specific one.
Set(t time.Time)
+ // TriggerTimer sends the current time to timer.C
+ TriggerTimer() bool
}
// NewClock returns an instance of a real-time clock.