Parameterize memory size and some key improvements (#153)

* Parameterize memory size
* Fix checking failure
* Extract default values as several constants
* Print strategy manager state after testing
* Add a mock clock trigger to send the "now" to "ticker.C" in each
verification round
* Update LICENSE

Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
diff --git a/Makefile b/Makefile
index c888fdb..c1f0de8 100644
--- a/Makefile
+++ b/Makefile
@@ -64,7 +64,7 @@
 include scripts/build/ginkgo.mk
 
 test-ci: $(GINKGO) ## Run the unit tests in CI
-	$(GINKGO) --skip-package=test/stress --race --cover --covermode atomic --coverprofile=coverage.out ./... 
+	$(GINKGO) -v --skip-package=test/stress --race --cover --covermode atomic --coverprofile=coverage.out ./... 
 
 ##@ Code quality targets
 
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 9ae630f..f40d05d 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -126,6 +126,17 @@
 	}
 }
 
+func TSSWithMemTableSize(size int64) TimeSeriesOptions {
+	return func(store TimeSeriesStore) {
+		if size < 1 {
+			return
+		}
+		if btss, ok := store.(*badgerTSS); ok {
+			btss.dbOpts.MemTableSize = size
+		}
+	}
+}
+
 type Iterator interface {
 	Next()
 	Rewind()
@@ -160,6 +171,9 @@
 	}
 	// Put all values into LSM
 	btss.dbOpts = btss.dbOpts.WithVLogPercentile(1.0)
+	if btss.dbOpts.MemTableSize < 8<<20 {
+		btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
+	}
 	var err error
 	btss.db, err = badger.Open(btss.dbOpts)
 	if err != nil {
@@ -187,6 +201,18 @@
 	}
 }
 
+// StoreWithMemTableSize sets MemTable size
+func StoreWithMemTableSize(size int64) StoreOptions {
+	return func(store Store) {
+		if size < 1 {
+			return
+		}
+		if bdb, ok := store.(*badgerDB); ok {
+			bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
+		}
+	}
+}
+
 // OpenStore creates a new Store
 func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) {
 	bdb := new(badgerDB)
@@ -196,6 +222,9 @@
 		opt(bdb)
 	}
 	bdb.dbOpts = bdb.dbOpts.WithNumVersionsToKeep(math.MaxUint32)
+	if bdb.dbOpts.MemTableSize > 0 && bdb.dbOpts.MemTableSize < 8<<20 {
+		bdb.dbOpts = bdb.dbOpts.WithValueThreshold(1 << 10)
+	}
 
 	var err error
 	bdb.db, err = badger.Open(bdb.dbOpts)
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2fad3d5..ef235a3 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -39,7 +39,9 @@
 	metadata metadata.Repo
 }
 
-func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo, l *logger.Logger) schemaRepo {
+func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo,
+	dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+) schemaRepo {
 	return schemaRepo{
 		l:        l,
 		metadata: metadata,
@@ -47,7 +49,7 @@
 			metadata,
 			repo,
 			l,
-			newSupplier(path, metadata, l),
+			newSupplier(path, metadata, dbOpts, l),
 			event.MeasureTopicShardEvent,
 			event.MeasureTopicEntityEvent,
 		),
@@ -168,13 +170,19 @@
 
 type supplier struct {
 	path     string
+	dbOpts   tsdb.DatabaseOpts
 	metadata metadata.Repo
 	l        *logger.Logger
 }
 
-func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) *supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+	dbOpts.EncodingMethod = tsdb.EncodingMethod{
+		EncoderPool: newEncoderPool(plainChunkSize, intChunkSize, l),
+		DecoderPool: newDecoderPool(plainChunkSize, intChunkSize, l),
+	}
 	return &supplier{
 		path:     path,
+		dbOpts:   dbOpts,
 		metadata: metadata,
 		l:        l,
 	}
@@ -195,17 +203,13 @@
 }
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
+	opts := s.dbOpts
+	opts.ShardNum = groupSchema.ResourceOpts.ShardNum
+	opts.Location = path.Join(s.path, groupSchema.Metadata.Name)
 	return tsdb.OpenDatabase(
 		context.WithValue(context.Background(), common.PositionKey, common.Position{
 			Module:   "measure",
 			Database: groupSchema.Metadata.Name,
 		}),
-		tsdb.DatabaseOpts{
-			Location: path.Join(s.path, groupSchema.Metadata.Name),
-			ShardNum: groupSchema.ResourceOpts.ShardNum,
-			EncodingMethod: tsdb.EncodingMethod{
-				EncoderPool: newEncoderPool(plainChunkSize, intChunkSize, s.l),
-				DecoderPool: newDecoderPool(plainChunkSize, intChunkSize, s.l),
-			},
-		})
+		opts)
 }
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 18239a2..26219d2 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -30,6 +30,7 @@
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 	resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
@@ -50,11 +51,13 @@
 var _ Service = (*service)(nil)
 
 type service struct {
+	root   string
+	dbOpts tsdb.DatabaseOpts
+
 	schemaRepo    schemaRepo
 	writeListener *writeCallback
 	l             *logger.Logger
 	metadata      metadata.Repo
-	root          string
 	pipeline      queue.Queue
 	repo          discovery.ServiceRepo
 	// stop channel for the service
@@ -76,6 +79,8 @@
 func (s *service) FlagSet() *run.FlagSet {
 	flagS := run.NewFlagSet("storage")
 	flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of database")
+	flagS.Int64Var(&s.dbOpts.BlockMemSize, "measure-block-mem-size", 16<<20, "block memory size")
+	flagS.Int64Var(&s.dbOpts.SeriesMemSize, "measure-seriesmeta-mem-size", 1<<20, "series metadata memory size")
 	return flagS
 }
 
@@ -98,7 +103,7 @@
 	if err != nil {
 		return err
 	}
-	s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.l)
+	s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l)
 	for _, g := range groups {
 		if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
 			continue
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 0895b16..172ed7a 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -41,7 +41,9 @@
 	metadata metadata.Repo
 }
 
-func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo, l *logger.Logger) schemaRepo {
+func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo,
+	dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+) schemaRepo {
 	return schemaRepo{
 		l:        l,
 		metadata: metadata,
@@ -49,7 +51,7 @@
 			metadata,
 			repo,
 			l,
-			newSupplier(path, metadata, l),
+			newSupplier(path, metadata, dbOpts, l),
 			event.StreamTopicShardEvent,
 			event.StreamTopicEntityEvent,
 		),
@@ -170,13 +172,19 @@
 
 type supplier struct {
 	path     string
+	dbOpts   tsdb.DatabaseOpts
 	metadata metadata.Repo
 	l        *logger.Logger
 }
 
-func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) *supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+	dbOpts.EncodingMethod = tsdb.EncodingMethod{
+		EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
+		DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
+	}
 	return &supplier{
 		path:     path,
+		dbOpts:   dbOpts,
 		metadata: metadata,
 		l:        l,
 	}
@@ -197,17 +205,13 @@
 }
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
+	opts := s.dbOpts
+	opts.ShardNum = groupSchema.ResourceOpts.ShardNum
+	opts.Location = path.Join(s.path, groupSchema.Metadata.Name)
 	return tsdb.OpenDatabase(
 		context.WithValue(context.Background(), common.PositionKey, common.Position{
 			Module:   "stream",
 			Database: groupSchema.Metadata.Name,
 		}),
-		tsdb.DatabaseOpts{
-			Location: path.Join(s.path, groupSchema.Metadata.Name),
-			ShardNum: groupSchema.ResourceOpts.ShardNum,
-			EncodingMethod: tsdb.EncodingMethod{
-				EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
-				DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
-			},
-		})
+		opts)
 }
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index 1109ad4..2a7f555 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -30,6 +30,7 @@
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -49,11 +50,13 @@
 var _ Service = (*service)(nil)
 
 type service struct {
+	root   string
+	dbOpts tsdb.DatabaseOpts
+
 	schemaRepo    schemaRepo
 	writeListener *writeCallback
 	l             *logger.Logger
 	metadata      metadata.Repo
-	root          string
 	pipeline      queue.Queue
 	repo          discovery.ServiceRepo
 	// stop channel for the service
@@ -71,6 +74,9 @@
 func (s *service) FlagSet() *run.FlagSet {
 	flagS := run.NewFlagSet("storage")
 	flagS.StringVar(&s.root, "stream-root-path", "/tmp", "the root path of database")
+	flagS.Int64Var(&s.dbOpts.BlockMemSize, "stream-block-mem-size", 8<<20, "block memory size")
+	flagS.Int64Var(&s.dbOpts.SeriesMemSize, "stream-seriesmeta-mem-size", 1<<20, "series metadata memory size")
+	flagS.Int64Var(&s.dbOpts.GlobalIndexMemSize, "stream-global-index-mem-size", 2<<20, "global index memory size")
 	return flagS
 }
 
@@ -93,7 +99,7 @@
 	if err != nil {
 		return err
 	}
-	s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.l)
+	s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l)
 	for _, g := range groups {
 		if g.Catalog != commonv1.Catalog_CATALOG_STREAM {
 			continue
@@ -149,5 +155,8 @@
 		metadata: metadata,
 		repo:     repo,
 		pipeline: pipeline,
+		dbOpts: tsdb.DatabaseOpts{
+			EnableGlobalIndex: true,
+		},
 	}, nil
 }
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 83854ac..1af3871 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -93,10 +93,11 @@
 
 	sm.db = db
 	sm.indexWriter = index.NewWriter(ctx, index.WriterOptions{
-		DB:         db,
-		ShardNum:   shardNum,
-		Families:   spec.schema.TagFamilies,
-		IndexRules: spec.indexRules,
+		DB:                db,
+		ShardNum:          shardNum,
+		Families:          spec.schema.TagFamilies,
+		IndexRules:        spec.indexRules,
+		EnableGlobalIndex: true,
 	})
 	return sm, nil
 }
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 3419a34..18393c0 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -19,15 +19,15 @@
 
 import (
 	"context"
+	"fmt"
 	"io"
 	"path"
+	"runtime"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"time"
 
-	"github.com/dgraph-io/ristretto/z"
-	"go.uber.org/atomic"
-
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -37,7 +37,6 @@
 	"github.com/apache/skywalking-banyandb/pkg/index/inverted"
 	"github.com/apache/skywalking-banyandb/pkg/index/lsm"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	"github.com/apache/skywalking-banyandb/pkg/run"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -45,16 +44,21 @@
 	componentMain              = "main"
 	componentSecondInvertedIdx = "inverted"
 	componentSecondLSMIdx      = "lsm"
+
+	defaultMainMemorySize = 8 << 20
 )
 
 type block struct {
-	path     string
-	l        *logger.Logger
-	suffix   string
-	ref      *z.Closer
-	lock     sync.RWMutex
-	closed   *atomic.Bool
-	position common.Position
+	path       string
+	l          *logger.Logger
+	queue      bucket.Queue
+	suffix     string
+	ref        *atomic.Int32
+	closed     *atomic.Bool
+	lock       sync.RWMutex
+	position   common.Position
+	memSize    int64
+	lsmMemSize int64
 
 	store         kv.TimeSeriesStore
 	invertedIndex index.Store
@@ -65,8 +69,7 @@
 	segID          uint16
 	blockID        uint16
 	encodingMethod EncodingMethod
-	flushCh        *run.Chan[struct{}]
-	flushChQueue   chan *run.Chan[struct{}]
+	flushCh        chan struct{}
 }
 
 type blockOpts struct {
@@ -75,6 +78,7 @@
 	startTime time.Time
 	suffix    string
 	path      string
+	queue     bucket.Queue
 }
 
 func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
@@ -84,60 +88,65 @@
 	}
 	id := GenerateInternalID(opts.blockSize.Unit, suffixInteger)
 	timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false)
-	encodingMethodObject := ctx.Value(encodingMethodKey)
-	if encodingMethodObject == nil {
-		encodingMethodObject = EncodingMethod{
-			EncoderPool: encoding.NewPlainEncoderPool(0),
-			DecoderPool: encoding.NewPlainDecoderPool(0),
-		}
-	}
 	clock, _ := timestamp.GetClock(ctx)
 	b = &block{
-		segID:          opts.segID,
-		blockID:        id,
-		path:           opts.path,
-		l:              logger.Fetch(ctx, "block"),
-		TimeRange:      timeRange,
-		Reporter:       bucket.NewTimeBasedReporter(timeRange, clock),
-		closed:         atomic.NewBool(true),
-		encodingMethod: encodingMethodObject.(EncodingMethod),
-		flushChQueue:   make(chan *run.Chan[struct{}]),
+		segID:     opts.segID,
+		blockID:   id,
+		path:      opts.path,
+		l:         logger.Fetch(ctx, "block"),
+		TimeRange: timeRange,
+		Reporter:  bucket.NewTimeBasedReporter(timeRange, clock),
+		flushCh:   make(chan struct{}),
+		ref:       &atomic.Int32{},
+		closed:    &atomic.Bool{},
+		queue:     opts.queue,
 	}
+	b.options(ctx)
 	position := ctx.Value(common.PositionKey)
 	if position != nil {
 		b.position = position.(common.Position)
 	}
 	go func() {
-		for {
-			ch := <-b.flushChQueue
-			for {
-				_, more := ch.Read()
-				if !more {
-					break
-				}
-				b.flush()
-			}
+		for range b.flushCh {
+			b.flush()
 		}
 	}()
-	return b, err
+	return b, b.open()
+}
+
+func (b *block) options(ctx context.Context) {
+	var options DatabaseOpts
+	o := ctx.Value(optionsKey)
+	if o != nil {
+		options = o.(DatabaseOpts)
+	}
+	if options.EncodingMethod.EncoderPool == nil {
+		options.EncodingMethod.EncoderPool = encoding.NewPlainEncoderPool(0)
+	}
+	if options.EncodingMethod.EncoderPool == nil {
+		options.EncodingMethod.DecoderPool = encoding.NewPlainDecoderPool(0)
+	}
+	b.encodingMethod = options.EncodingMethod
+	if options.BlockMemSize < 1 {
+		b.memSize = defaultMainMemorySize
+	} else {
+		b.memSize = options.BlockMemSize
+	}
+	b.lsmMemSize = b.memSize / 8
+	if b.lsmMemSize < defaultKVMemorySize {
+		b.lsmMemSize = defaultKVMemorySize
+	}
 }
 
 func (b *block) open() (err error) {
-	b.lock.Lock()
-	defer b.lock.Unlock()
-	if !b.closed.Load() {
-		return nil
-	}
-	b.ref = z.NewCloser(1)
-	b.flushCh = run.NewChan(make(chan struct{}))
-	b.flushChQueue <- b.flushCh
 	if b.store, err = kv.OpenTimeSeriesStore(
 		0,
 		path.Join(b.path, componentMain),
 		kv.TSSWithEncoding(b.encodingMethod.EncoderPool, b.encodingMethod.DecoderPool),
 		kv.TSSWithLogger(b.l.Named(componentMain)),
+		kv.TSSWithMemTableSize(b.memSize),
 		kv.TSSWithFlushCallback(func() {
-			b.flushCh.Write(struct{}{})
+			b.flushCh <- struct{}{}
 		}),
 	); err != nil {
 		return err
@@ -150,33 +159,76 @@
 		return err
 	}
 	if b.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{
-		Path:   path.Join(b.path, componentSecondLSMIdx),
-		Logger: b.l.Named(componentSecondLSMIdx),
+		Path:         path.Join(b.path, componentSecondLSMIdx),
+		Logger:       b.l.Named(componentSecondLSMIdx),
+		MemTableSize: b.lsmMemSize,
 	}); err != nil {
 		return err
 	}
 	b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
+	b.ref.Store(0)
 	b.closed.Store(false)
-
 	return nil
 }
 
-func (b *block) delegate() blockDelegate {
-	if b.isClosed() {
-		return nil
+func (b *block) delegate() (blockDelegate, error) {
+	if b.incRef() {
+		return &bDelegate{
+			delegate: b,
+		}, nil
+	}
+	b.lock.Lock()
+	defer b.lock.Unlock()
+	b.queue.Push(BlockID{
+		BlockID: b.blockID,
+		SegID:   b.segID,
+	})
+	// TODO: remove the block which fails to open from the queue
+	err := b.open()
+	if err != nil {
+		b.l.Error().Err(err).Stringer("block", b).Msg("fail to open block")
+		return nil, err
 	}
 	b.incRef()
 	return &bDelegate{
 		delegate: b,
+	}, nil
+}
+
+func (b *block) incRef() bool {
+loop:
+	if b.Closed() {
+		return false
 	}
+	r := b.ref.Load()
+	if b.ref.CompareAndSwap(r, r+1) {
+		return true
+	}
+	runtime.Gosched()
+	goto loop
 }
 
-func (b *block) dscRef() {
-	b.ref.Done()
+func (b *block) Done() {
+loop:
+	r := b.ref.Load()
+	if r < 1 {
+		return
+	}
+	if b.ref.CompareAndSwap(r, r-1) {
+		return
+	}
+	runtime.Gosched()
+	goto loop
 }
 
-func (b *block) incRef() {
-	b.ref.AddRunning(1)
+func (b *block) waitDone() {
+loop:
+	if b.ref.Load() < 1 {
+		b.ref.Store(0)
+		return
+	}
+	runtime.Gosched()
+	goto loop
 }
 
 func (b *block) flush() {
@@ -193,28 +245,27 @@
 func (b *block) close() {
 	b.lock.Lock()
 	defer b.lock.Unlock()
-	if b.isClosed() {
-		return
-	}
-	b.dscRef()
-	b.ref.SignalAndWait()
+	b.closed.Store(true)
+	b.waitDone()
 	for _, closer := range b.closableLst {
 		_ = closer.Close()
 	}
-	b.closed.Store(true)
-	b.flushCh.Close()
 }
 
-func (b *block) isClosed() bool {
+func (b *block) Closed() bool {
 	return b.closed.Load()
 }
 
 func (b *block) String() string {
-	return b.Reporter.String()
+	return fmt.Sprintf("BlockID-%d-%d", b.segID, b.blockID)
 }
 
 func (b *block) stats() (names []string, stats []observability.Statistics) {
 	names = append(names, componentMain, componentSecondInvertedIdx, componentSecondLSMIdx)
+	if b.Closed() {
+		stats = make([]observability.Statistics, 3)
+		return
+	}
 	stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats())
 	return names, stats
 }
@@ -290,6 +341,6 @@
 }
 
 func (d *bDelegate) Close() error {
-	d.delegate.dscRef()
+	d.delegate.Done()
 	return nil
 }
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 3cd6531..cf928d6 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -18,7 +18,9 @@
 package bucket
 
 import (
-	"sync"
+	"fmt"
+	"math"
+	"sync/atomic"
 
 	"github.com/pkg/errors"
 	"go.uber.org/multierr"
@@ -34,14 +36,12 @@
 type Ratio float64
 
 type Strategy struct {
-	optionsErr error
-	ratio      Ratio
-	ctrl       Controller
-	current    Reporter
-	next       Reporter
-	mux        sync.Mutex
-	logger     *logger.Logger
-	stopCh     chan struct{}
+	optionsErr   error
+	ratio        Ratio
+	ctrl         Controller
+	current      atomic.Value
+	currentRatio uint64
+	logger       *logger.Logger
 }
 
 type StrategyOptions func(*Strategy)
@@ -68,9 +68,8 @@
 		return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
 	}
 	strategy := &Strategy{
-		ctrl:   ctrl,
-		ratio:  0.8,
-		stopCh: make(chan struct{}),
+		ctrl:  ctrl,
+		ratio: 0.8,
 	}
 	for _, opt := range options {
 		opt(strategy)
@@ -85,52 +84,50 @@
 }
 
 func (s *Strategy) Run() {
-	reset := func() {
-		for s.current == nil {
-			s.current = s.ctrl.Current()
-		}
-		s.next = nil
+	for s.current.Load() == nil {
+		s.current.Store(s.ctrl.Current())
 	}
-	reset()
 	go func(s *Strategy) {
-		var err error
-	bucket:
-		c := s.current.Report()
 		for {
-			select {
-			case status, closed := <-c:
-				if !closed {
-					reset()
-					goto bucket
-				}
-				ratio := Ratio(status.Volume) / Ratio(status.Capacity)
-				if ratio >= s.ratio && s.next == nil {
-					s.next, err = s.ctrl.Next()
-					if errors.Is(err, ErrNoMoreBucket) {
-						return
-					}
-					if err != nil {
-						s.logger.Err(err).Msg("failed to create the next bucket")
-					}
-				}
-				if ratio >= 1.0 {
-					s.mux.Lock()
-					s.ctrl.OnMove(s.current, s.next)
-					s.current = s.next
-					s.next = nil
-					s.mux.Unlock()
-					goto bucket
-				}
-			case <-s.stopCh:
-				return
-			}
+			c := s.current.Load().(Reporter).Report()
+			s.observe(c)
 		}
 	}(s)
 }
 
+func (s *Strategy) String() string {
+	c := s.current.Load()
+	if c == nil {
+		return "nil"
+	}
+	return fmt.Sprintf("%s:%f", c.(Reporter).String(),
+		math.Float64frombits(atomic.LoadUint64(&s.currentRatio)))
+}
+
+func (s *Strategy) observe(c Channel) {
+	var err error
+	var next Reporter
+	moreBucket := true
+	for status := range c {
+		ratio := Ratio(status.Volume) / Ratio(status.Capacity)
+		atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(ratio)))
+		if ratio >= s.ratio && next == nil && moreBucket {
+			next, err = s.ctrl.Next()
+			if errors.Is(err, ErrNoMoreBucket) {
+				moreBucket = false
+			} else if err != nil {
+				s.logger.Err(err).Msg("failed to create the next bucket")
+			}
+		}
+		if ratio >= 1.0 {
+			s.ctrl.OnMove(s.current.Load().(Reporter), next)
+			if next != nil {
+				s.current.Store(next)
+			}
+			return
+		}
+	}
+}
+
 func (s *Strategy) Close() {
-	close(s.stopCh)
-	s.mux.Lock()
-	defer s.mux.Unlock()
-	s.ctrl.OnMove(s.current, nil)
 }
diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go
index 510707c..b73c044 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -123,18 +123,18 @@
 
 type reporter struct {
 	capacity int
-	volume   int
 	step     int
 }
 
 func (r *reporter) Report() bucket.Channel {
 	ch := make(bucket.Channel)
 	go func() {
+		var volume int
 		for i := 0; i < r.capacity; i++ {
-			r.volume += r.step
+			volume += r.step
 			ch <- bucket.Status{
 				Capacity: r.capacity,
-				Volume:   r.volume,
+				Volume:   volume,
 			}
 		}
 		close(ch)
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 3f94c25..8b0dcc9 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -33,7 +33,6 @@
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 type CallbackFn func()
@@ -52,18 +51,20 @@
 }
 
 type WriterOptions struct {
-	ShardNum   uint32
-	Families   []*databasev1.TagFamilySpec
-	IndexRules []*databasev1.IndexRule
-	DB         tsdb.Supplier
+	ShardNum          uint32
+	Families          []*databasev1.TagFamilySpec
+	IndexRules        []*databasev1.IndexRule
+	DB                tsdb.Supplier
+	EnableGlobalIndex bool
 }
 
 type Writer struct {
-	l              *logger.Logger
-	db             tsdb.Supplier
-	shardNum       uint32
-	ch             *run.Chan[Message]
-	indexRuleIndex []*partition.IndexRuleLocator
+	l                 *logger.Logger
+	db                tsdb.Supplier
+	shardNum          uint32
+	enableGlobalIndex bool
+	ch                chan Message
+	indexRuleIndex    []*partition.IndexRuleLocator
 }
 
 func NewWriter(ctx context.Context, options WriterOptions) *Writer {
@@ -76,29 +77,26 @@
 	}
 	w.shardNum = options.ShardNum
 	w.db = options.DB
+	w.enableGlobalIndex = options.EnableGlobalIndex
 	w.indexRuleIndex = partition.ParseIndexRuleLocators(options.Families, options.IndexRules)
-	w.ch = run.NewChan[Message](make(chan Message))
+	w.ch = make(chan Message)
 	w.bootIndexGenerator()
 	return w
 }
 
 func (s *Writer) Write(value Message) {
 	go func(m Message) {
-		s.ch.Write(m)
+		s.ch <- m
 	}(value)
 }
 
 func (s *Writer) Close() error {
-	return s.ch.Close()
+	return nil
 }
 
 func (s *Writer) bootIndexGenerator() {
 	go func() {
-		for {
-			m, more := s.ch.Read()
-			if !more {
-				return
-			}
+		for m := range s.ch {
 			var err error
 			for _, ruleIndex := range s.indexRuleIndex {
 				rule := ruleIndex.Rule
@@ -106,6 +104,10 @@
 				case databasev1.IndexRule_LOCATION_SERIES:
 					err = multierr.Append(err, writeLocalIndex(m.LocalWriter, ruleIndex, m.Value))
 				case databasev1.IndexRule_LOCATION_GLOBAL:
+					if !s.enableGlobalIndex {
+						s.l.Warn().Stringer("index-rule", ruleIndex.Rule).Msg("global index is disabled")
+						continue
+					}
 					err = multierr.Append(err, s.writeGlobalIndex(m.Scope, ruleIndex, m.LocalWriter.ItemID(), m.Value))
 				}
 			}
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index a45f57d..a97cddf 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -81,7 +81,7 @@
 		segStats.MaxMemBytes += segStat.MaxMemBytes
 		segStats.MemBytes += segStat.MemBytes
 		for _, b := range seg.blockController.blocks() {
-			if b.closed.Load() {
+			if b.Closed() {
 				continue
 			}
 			names, bss := b.stats()
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index a758947..453548e 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -20,6 +20,7 @@
 import (
 	"context"
 	"errors"
+	"fmt"
 	"strconv"
 	"sync"
 	"time"
@@ -77,9 +78,25 @@
 	if err != nil {
 		return nil, err
 	}
-	if s.globalIndex, err = kv.OpenStore(0, indexPath, kv.StoreWithLogger(s.l)); err != nil {
-		return nil, err
+	o := ctx.Value(optionsKey)
+	if o != nil {
+		options := o.(DatabaseOpts)
+		if options.EnableGlobalIndex {
+			memSize := options.GlobalIndexMemSize
+			if memSize == 0 {
+				memSize = defaultKVMemorySize
+			}
+			if s.globalIndex, err = kv.OpenStore(
+				0,
+				indexPath,
+				kv.StoreWithLogger(s.l),
+				kv.StoreWithMemTableSize(memSize),
+			); err != nil {
+				return nil, err
+			}
+		}
 	}
+
 	s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l))
 	if err != nil {
 		return nil, err
@@ -90,7 +107,9 @@
 
 func (s *segment) close() {
 	s.blockController.close()
-	s.globalIndex.Close()
+	if s.globalIndex != nil {
+		s.globalIndex.Close()
+	}
 	s.Stop()
 }
 
@@ -99,10 +118,13 @@
 }
 
 func (s segment) String() string {
-	return s.Reporter.String()
+	return fmt.Sprintf("SegID-%d", s.id)
 }
 
 func (s *segment) Stats() observability.Statistics {
+	if s.globalIndex == nil {
+		return observability.Statistics{}
+	}
 	return s.globalIndex.Stats()
 }
 
@@ -159,7 +181,6 @@
 	if errors.Is(err, ErrEndOfSegment) {
 		return nil, bucket.ErrNoMoreBucket
 	}
-	err = reporter.open()
 	if err != nil {
 		return nil, err
 	}
@@ -201,18 +222,30 @@
 	panic("invalid interval unit")
 }
 
-func (bc *blockController) span(timeRange timestamp.TimeRange) (bb []*block) {
-	return bc.ensureBlockOpen(bc.search(func(b *block) bool {
+func (bc *blockController) span(timeRange timestamp.TimeRange) ([]blockDelegate, error) {
+	bb := bc.search(func(b *block) bool {
 		return b.Overlapping(timeRange)
-	}))
+	})
+	if bb == nil {
+		return nil, nil
+	}
+	dd := make([]blockDelegate, len(bb))
+	for i, b := range bb {
+		d, err := b.delegate()
+		if err != nil {
+			return nil, err
+		}
+		dd[i] = d
+	}
+	return dd, nil
 }
 
-func (bc *blockController) get(blockID uint16) *block {
+func (bc *blockController) get(blockID uint16) (blockDelegate, error) {
 	b := bc.getBlock(blockID)
 	if b != nil {
-		return bc.ensureBlockOpen([]*block{b})[0]
+		return b.delegate()
 	}
-	return nil
+	return nil, nil
 }
 
 func (bc *blockController) getBlock(blockID uint16) *block {
@@ -250,26 +283,6 @@
 	return bb
 }
 
-func (bc *blockController) ensureBlockOpen(blocks []*block) (openedBlocks []*block) {
-	if blocks == nil {
-		return nil
-	}
-	for _, b := range blocks {
-		if b.isClosed() {
-			if err := b.open(); err != nil {
-				bc.l.Error().Err(err).Stringer("block", b).Msg("fail to open block")
-				continue
-			}
-		}
-		openedBlocks = append(openedBlocks, b)
-		bc.blockQueue.Push(BlockID{
-			BlockID: b.blockID,
-			SegID:   b.segID,
-		})
-	}
-	return openedBlocks
-}
-
 func (bc *blockController) closeBlock(blockID uint16) {
 	bc.RLock()
 	defer bc.RUnlock()
@@ -309,13 +322,10 @@
 		return err
 	}
 	if bc.Current() == nil {
-		b, err := bc.create(bc.clock.Now())
+		_, err := bc.create(bc.clock.Now())
 		if err != nil {
 			return err
 		}
-		if err = b.open(); err != nil {
-			return err
-		}
 	}
 	return nil
 }
@@ -351,6 +361,7 @@
 			startTime: starTime,
 			suffix:    suffix,
 			blockSize: bc.blockSize,
+			queue:     bc.blockQueue,
 		}); err != nil {
 		return nil, err
 	}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index bed9555..1696616 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -95,7 +95,10 @@
 }
 
 func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
-	b := s.blockDB.block(id)
+	b, err := s.blockDB.block(id)
+	if err != nil {
+		return nil, nil, err
+	}
 	if b == nil {
 		return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id)
 	}
@@ -111,7 +114,10 @@
 }
 
 func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
-	blocks := s.blockDB.span(timeRange)
+	blocks, err := s.blockDB.span(timeRange)
+	if err != nil {
+		return nil, err
+	}
 	if len(blocks) < 1 {
 		return nil, ErrEmptySeriesSpan
 	}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index e2a6ef8..9ad7cbe 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -132,8 +132,8 @@
 
 type blockDatabase interface {
 	shardID() common.ShardID
-	span(timeRange timestamp.TimeRange) []blockDelegate
-	block(id GlobalItemID) blockDelegate
+	span(timeRange timestamp.TimeRange) ([]blockDelegate, error)
+	block(id GlobalItemID) (blockDelegate, error)
 }
 
 var (
@@ -172,16 +172,12 @@
 	return newSeries(s.context(), id, s), nil
 }
 
-func (s *seriesDB) block(id GlobalItemID) blockDelegate {
+func (s *seriesDB) block(id GlobalItemID) (blockDelegate, error) {
 	seg := s.segCtrl.get(id.segID)
 	if seg == nil {
-		return nil
+		return nil, nil
 	}
-	block := seg.blockController.get(id.blockID)
-	if block == nil {
-		return nil
-	}
-	return block.delegate()
+	return seg.blockController.get(id.blockID)
 }
 
 func (s *seriesDB) shardID() common.ShardID {
@@ -238,18 +234,20 @@
 	return result, err
 }
 
-func (s *seriesDB) span(timeRange timestamp.TimeRange) []blockDelegate {
+func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]blockDelegate, error) {
 	// TODO: return correct blocks
 	result := make([]blockDelegate, 0)
 	for _, s := range s.segCtrl.span(timeRange) {
-		for _, b := range s.blockController.span(timeRange) {
-			bd := b.delegate()
-			if bd != nil {
-				result = append(result, bd)
-			}
+		dd, err := s.blockController.span(timeRange)
+		if err != nil {
+			return nil, err
 		}
+		if dd == nil {
+			continue
+		}
+		result = append(result, dd...)
 	}
-	return result
+	return result, nil
 }
 
 func (s *seriesDB) context() context.Context {
@@ -270,8 +268,23 @@
 		segCtrl: segCtrl,
 		l:       logger.Fetch(ctx, "series_database"),
 	}
+	o := ctx.Value(optionsKey)
+	var memSize int64
+	if o != nil {
+		options := o.(DatabaseOpts)
+		if options.SeriesMemSize > 1 {
+			memSize = options.SeriesMemSize
+		} else {
+			memSize = defaultKVMemorySize
+		}
+	} else {
+		memSize = defaultKVMemorySize
+	}
 	var err error
-	sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
+	sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md",
+		kv.StoreWithNamedLogger("metadata", sdb.l),
+		kv.StoreWithMemTableSize(memSize),
+	)
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index a318a80..831dde3 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -19,6 +19,7 @@
 
 import (
 	"context"
+	"sort"
 	"strconv"
 	"sync"
 	"time"
@@ -31,7 +32,10 @@
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-const defaultBlockQueueSize = 1 << 4
+const (
+	defaultBlockQueueSize = 1 << 4
+	defaultKVMemorySize   = 1 << 20
+)
 
 var _ Shard = (*shard)(nil)
 
@@ -118,7 +122,9 @@
 }
 
 func (s *shard) State() (shardState ShardState) {
+	shardState.StrategyManagers = append(shardState.StrategyManagers, s.segmentManageStrategy.String())
 	for _, seg := range s.segmentController.segments() {
+		shardState.StrategyManagers = append(shardState.StrategyManagers, seg.blockManageStrategy.String())
 		for _, b := range seg.blockController.blocks() {
 			shardState.Blocks = append(shardState.Blocks, BlockState{
 				ID: BlockID{
@@ -126,7 +132,7 @@
 					BlockID: b.blockID,
 				},
 				TimeRange: b.TimeRange,
-				Closed:    b.isClosed(),
+				Closed:    b.Closed(),
 			})
 		}
 	}
@@ -135,6 +141,15 @@
 	for i, v := range s.segmentController.blockQueue.All() {
 		shardState.OpenBlocks[i] = v.(BlockID)
 	}
+	sort.Slice(shardState.OpenBlocks, func(i, j int) bool {
+		x := shardState.OpenBlocks[i]
+		y := shardState.OpenBlocks[j]
+		if x.SegID == y.SegID {
+			return x.BlockID < y.BlockID
+		}
+		return x.SegID < y.SegID
+	})
+	s.l.Info().Interface("", shardState).Msg("state")
 	return shardState
 }
 
@@ -295,7 +310,6 @@
 	event := sc.l.Info()
 	if prev != nil {
 		event.Stringer("prev", prev)
-		prev.(*segment).blockManageStrategy.Close()
 	}
 	if next != nil {
 		event.Stringer("next", next)
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 7ef3839..6571d53 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -45,6 +45,7 @@
 			Expect(err).NotTo(HaveOccurred())
 		})
 		AfterEach(func() {
+			GinkgoWriter.Printf("shard state:%+v \n", shard.State())
 			shard.Close()
 			deferFn()
 		})
@@ -67,7 +68,7 @@
 			t1 := clock.Now()
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -80,8 +81,11 @@
 			clock.Add(10 * time.Hour)
 			t2 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/01 10:00 has been triggered")
+				}
 				return shard.State().Blocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -99,12 +103,15 @@
 			}))
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{}))
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
 			By("01/01 13:00 moves to the 2nd block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/01 13:00 has been triggered")
+				}
 				return shard.State().OpenBlocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -114,8 +121,11 @@
 			clock.Add(9 * time.Hour)
 			t3 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/01 22:00 has been triggered")
+				}
 				return shard.State().Blocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -141,8 +151,11 @@
 			By("01/02 01:00 moves to 3rd block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/02 01:00 has been triggered")
+				}
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -156,8 +169,11 @@
 			clock.Add(9 * time.Hour)
 			t4 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/02 10:00 has been triggered")
+				}
 				return shard.State().Blocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -190,8 +206,15 @@
 			By("01/02 13:00 moves to 4th block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/02 13:00 has been triggered")
+				}
 				return shard.State().OpenBlocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+				},
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -200,17 +223,16 @@
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
 				},
-				{
-					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
-					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
-				},
 			}))
 			By("01/02 22:00 5th block is opened")
 			clock.Add(9 * time.Hour)
 			t5 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/02 22:00 has been triggered")
+				}
 				return shard.State().Blocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -250,8 +272,15 @@
 			By("01/03 01:00 close 1st block by adding 5th block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/03 01:00 has been triggered")
+				}
 				return shard.State().OpenBlocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -260,14 +289,10 @@
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
 				},
-				{
-					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
-					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
-				},
 			}))
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -318,23 +343,23 @@
 			Expect(err).NotTo(HaveOccurred())
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
-				{
-					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
-					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
-				},
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
 				},
 				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
-					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
 				},
 			}))
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -348,7 +373,6 @@
 						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
 					},
 					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
-					Closed:    true,
 				},
 				{
 					ID: tsdb.BlockID{
@@ -356,6 +380,7 @@
 						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
 					},
 					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+					Closed:    true,
 				},
 				{
 					ID: tsdb.BlockID{
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 20d8e6e..2f77874 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -59,10 +59,10 @@
 	ErrInvalidShardID = errors.New("invalid shard id")
 	ErrOpenDatabase   = errors.New("fails to open the database")
 
-	encodingMethodKey = contextEncodingMethodKey{}
+	optionsKey = contextOptionsKey{}
 )
 
-type contextEncodingMethodKey struct{}
+type contextOptionsKey struct{}
 
 type Supplier interface {
 	SupplyTSDB() Database
@@ -84,11 +84,15 @@
 var _ Database = (*database)(nil)
 
 type DatabaseOpts struct {
-	Location       string
-	ShardNum       uint32
-	EncodingMethod EncodingMethod
-	SegmentSize    IntervalRule
-	BlockSize      IntervalRule
+	Location           string
+	ShardNum           uint32
+	EncodingMethod     EncodingMethod
+	SegmentSize        IntervalRule
+	BlockSize          IntervalRule
+	BlockMemSize       int64
+	SeriesMemSize      int64
+	EnableGlobalIndex  bool
+	GlobalIndexMemSize int64
 }
 
 type EncodingMethod struct {
@@ -111,8 +115,9 @@
 	Closed    bool
 }
 type ShardState struct {
-	Blocks     []BlockState
-	OpenBlocks []BlockID
+	Blocks           []BlockState
+	OpenBlocks       []BlockID
+	StrategyManagers []string
 }
 
 type database struct {
@@ -186,7 +191,7 @@
 		return nil, errors.Wrap(err, "failed to read directory contents failed")
 	}
 	thisContext := context.WithValue(ctx, logger.ContextKey, db.logger)
-	thisContext = context.WithValue(thisContext, encodingMethodKey, opts.EncodingMethod)
+	thisContext = context.WithValue(thisContext, optionsKey, opts)
 	if len(entries) > 0 {
 		return loadDatabase(thisContext, db)
 	}
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index 97c11f2..cd11765 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -26,7 +26,7 @@
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-var defaultEventallyTimeout = 30 * time.Second
+var defaultEventuallyTimeout = 30 * time.Second
 
 func TestTsdb(t *testing.T) {
 	RegisterFailHandler(Fail)
diff --git a/dist/LICENSE b/dist/LICENSE
index a8afea8..08d2763 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -256,7 +256,7 @@
     github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3 BSD-3-Clause
     github.com/pmezard/go-difflib v1.0.0 BSD-3-Clause
     github.com/spf13/pflag v1.0.5 BSD-3-Clause
-    golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e BSD-3-Clause
+    golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 BSD-3-Clause
     golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f BSD-3-Clause
     golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 BSD-3-Clause
     golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 BSD-3-Clause
@@ -282,7 +282,7 @@
 MIT licenses
 ========================================================================
 
-    github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8 MIT
+    github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 MIT
     github.com/beorn7/perks v1.0.1 MIT
     github.com/cespare/xxhash v1.1.0 MIT
     github.com/cespare/xxhash/v2 v2.1.2 MIT
@@ -305,7 +305,7 @@
     github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 MIT
     github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 MIT
     go.etcd.io/bbolt v1.3.6 MIT
-    go.uber.org/atomic v1.9.0 MIT
+    go.uber.org/atomic v1.7.0 MIT
     go.uber.org/multierr v1.8.0 MIT
     go.uber.org/zap v1.17.0 MIT
     gopkg.in/natefinch/lumberjack.v2 v2.0.0 MIT
diff --git a/go.mod b/go.mod
index 3b5b9d7..1cecc88 100644
--- a/go.mod
+++ b/go.mod
@@ -8,15 +8,20 @@
 	github.com/cespare/xxhash v1.1.0
 	github.com/dgraph-io/badger/v3 v3.2011.1
 	github.com/dgraph-io/ristretto v0.1.0
+	github.com/envoyproxy/protoc-gen-validate v0.1.0
+	github.com/go-chi/chi/v5 v5.0.7
 	github.com/golang/mock v1.6.0
-	github.com/golang/protobuf v1.5.2 // indirect
 	github.com/google/go-cmp v0.5.8
 	github.com/google/uuid v1.3.0
+	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
+	github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3
+	github.com/hashicorp/golang-lru v0.5.4
 	github.com/klauspost/compress v1.15.6
 	github.com/oklog/run v1.1.0
 	github.com/onsi/ginkgo/v2 v2.1.4
 	github.com/onsi/gomega v1.19.0
 	github.com/pkg/errors v0.9.1
+	github.com/prometheus/client_golang v1.12.2
 	github.com/rs/zerolog v1.26.1
 	github.com/spf13/cobra v1.4.0
 	github.com/spf13/pflag v1.0.5
@@ -25,25 +30,14 @@
 	go.etcd.io/etcd/client/v3 v3.5.4
 	go.etcd.io/etcd/server/v3 v3.5.4
 	go.uber.org/multierr v1.8.0
-	golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
+	golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f
+	golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3
 	google.golang.org/genproto v0.0.0-20220615141314-f1464d18c36b
 	google.golang.org/grpc v1.47.0
 	google.golang.org/protobuf v1.28.0
 )
 
 require (
-	github.com/envoyproxy/protoc-gen-validate v0.1.0
-	github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3
-	github.com/hashicorp/golang-lru v0.5.4
-	github.com/prometheus/client_golang v1.12.2
-	go.uber.org/atomic v1.9.0
-	golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f
-	golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3
-)
-
-require golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
-
-require (
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/bits-and-blooms/bitset v1.2.0 // indirect
 	github.com/cespare/xxhash/v2 v2.1.2 // indirect
@@ -53,15 +47,14 @@
 	github.com/dustin/go-humanize v1.0.0 // indirect
 	github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
 	github.com/fsnotify/fsnotify v1.5.4 // indirect
-	github.com/go-chi/chi/v5 v5.0.7
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang/glog v1.0.0 // indirect
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
+	github.com/golang/protobuf v1.5.2 // indirect
 	github.com/golang/snappy v0.0.3 // indirect
 	github.com/google/btree v1.0.1 // indirect
 	github.com/google/flatbuffers v1.12.1 // indirect
 	github.com/gorilla/websocket v1.4.2 // indirect
-	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
 	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
@@ -106,11 +99,14 @@
 	go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
 	go.opentelemetry.io/otel/trace v0.20.0 // indirect
 	go.opentelemetry.io/proto/otlp v0.7.0 // indirect
+	go.uber.org/atomic v1.7.0 // indirect
 	go.uber.org/zap v1.17.0 // indirect
-	golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
+	golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
 	golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
+	golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
 	golang.org/x/text v0.3.7 // indirect
 	golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
+	golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
 	gopkg.in/ini.v1 v1.66.4 // indirect
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
@@ -119,6 +115,6 @@
 )
 
 replace (
-	github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8
+	github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97
 	github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4
 )
diff --git a/go.sum b/go.sum
index d37cd1d..b1b19e4 100644
--- a/go.sum
+++ b/go.sum
@@ -50,8 +50,8 @@
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
 github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4 h1:iLwRXI6WHBMb2VkWrlYKIFngPKwgs2OnjliXdMB5DY0=
 github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4/go.mod h1:Q0luV7nB94o3Bl4hYqAPy03+QTtLxs9pWdUEQb0i0K0=
-github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8 h1:TK3KN7H7ROhT0/sfY8JWWa5xHOo5jUqW7Stc/VxNJyA=
-github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
+github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
+github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -512,9 +512,8 @@
 go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
 go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
 go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
-go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
-go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
 go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
@@ -536,8 +535,8 @@
 golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
 golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
-golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=
+golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -577,8 +576,6 @@
 golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
-golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
-golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
 golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 192a94f..b3eb099 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -46,15 +46,8 @@
 }
 
 func (f *FieldKey) Unmarshal(raw []byte) error {
-	switch len(raw) {
-	case 12:
-		f.SeriesID = common.SeriesID(convert.BytesToUint64(raw[0:8]))
-		f.IndexRuleID = convert.BytesToUint32(raw[8:])
-	case 4:
-		f.IndexRuleID = convert.BytesToUint32(raw)
-	default:
-		return errors.Wrap(ErrMalformed, "unmarshal field key")
-	}
+	f.SeriesID = common.SeriesID(convert.BytesToUint64(raw[0:8]))
+	f.IndexRuleID = convert.BytesToUint32(raw[8:])
 	return nil
 }
 
@@ -72,14 +65,17 @@
 }
 
 func (f *Field) Unmarshal(raw []byte) error {
+	if len(raw) < 13 {
+		return errors.WithMessagef(ErrMalformed, "malformed field: expected more than 12, got %d", len(raw))
+	}
 	fk := &f.Key
-	err := fk.Unmarshal(raw[:len(raw)-8])
+	err := fk.Unmarshal(raw[:12])
 	if err != nil {
 		return errors.Wrap(err, "unmarshal a field")
 	}
-	termID := raw[len(raw)-8:]
-	f.Term = make([]byte, len(termID))
-	copy(f.Term, termID)
+	term := raw[12:]
+	f.Term = make([]byte, len(term))
+	copy(f.Term, term)
 	return nil
 }
 
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 53de61b..e903098 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -55,14 +55,19 @@
 }
 
 type StoreOpts struct {
-	Path   string
-	Logger *logger.Logger
+	Path         string
+	Logger       *logger.Logger
+	MemTableSize int64
 }
 
 func NewStore(opts StoreOpts) (index.Store, error) {
 	var err error
 	var lsm kv.Store
-	if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", kv.StoreWithLogger(opts.Logger)); err != nil {
+	if lsm, err = kv.OpenStore(
+		0,
+		opts.Path+"/lsm",
+		kv.StoreWithLogger(opts.Logger),
+		kv.StoreWithMemTableSize(opts.MemTableSize)); err != nil {
 		return nil, err
 	}
 	return &store{
diff --git a/pkg/run/channel.go b/pkg/run/channel.go
deleted file mode 100644
index 82a9299..0000000
--- a/pkg/run/channel.go
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package run
-
-import (
-	"sync"
-)
-
-type Chan[T any] struct {
-	ch     chan T
-	closer sync.WaitGroup
-}
-
-func NewChan[T any](ch chan T) *Chan[T] {
-	return &Chan[T]{
-		ch: ch,
-	}
-}
-
-func (c *Chan[T]) Write(item T) {
-	c.closer.Add(1)
-	defer c.closer.Done()
-	c.ch <- item
-}
-
-func (c *Chan[T]) Read() (T, bool) {
-	item, more := <-c.ch
-	return item, more
-}
-
-func (c *Chan[T]) Close() error {
-	c.closer.Wait()
-	close(c.ch)
-	return nil
-}
diff --git a/pkg/test/helpers/fail_interceptor.go b/pkg/test/helpers/fail_interceptor.go
index a442d27..b971088 100644
--- a/pkg/test/helpers/fail_interceptor.go
+++ b/pkg/test/helpers/fail_interceptor.go
@@ -17,8 +17,9 @@
 package helpers
 
 import (
+	"sync/atomic"
+
 	"github.com/onsi/gomega/types"
-	"go.uber.org/atomic"
 )
 
 type FailInterceptor struct {
@@ -29,7 +30,7 @@
 func NewFailInterceptor(fail types.GomegaFailHandler) *FailInterceptor {
 	return &FailInterceptor{
 		ginkgoFail: fail,
-		didFail:    atomic.NewBool(false),
+		didFail:    &atomic.Bool{},
 	}
 }
 
diff --git a/pkg/timestamp/clock.go b/pkg/timestamp/clock.go
index 92b845a..322747a 100644
--- a/pkg/timestamp/clock.go
+++ b/pkg/timestamp/clock.go
@@ -34,6 +34,8 @@
 	Add(d time.Duration)
 	// Set sets the current time of the mock clock to a specific one.
 	Set(t time.Time)
+	// TriggerTimer sends the current time to timer.C
+	TriggerTimer() bool
 }
 
 // NewClock returns an instance of a real-time clock.