blob: 49dbf4d77c95aaa1be754829bd8407f725e59637 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package tsdb
import (
"fmt"
"sync"
"time"
"unsafe"
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/wal"
)
const (
defaultSize = 1 << 20 // 1MB
nodeAlign = int(unsafe.Sizeof(uint64(0))) - 1
defaultWalSyncMode = false
)
var (
bufferMeterProvider meter.Provider
maxBytes meter.Gauge
mutableBytes meter.Gauge
)
func init() {
bufferMeterProvider = observability.NewMeterProvider(meterTSDB.SubScope("buffer"))
labelNames := append(common.LabelNames(), "bucket")
maxBytes = bufferMeterProvider.Gauge("max_bytes", labelNames...)
mutableBytes = bufferMeterProvider.Gauge("mutable_bytes", labelNames...)
}
type operation struct {
recoveryDoneFn func()
key []byte
value []byte
epoch uint64
}
type flushEvent struct {
data *skl.Skiplist
walSegmentID wal.SegmentID
}
type onFlush func(shardIndex int, skl *skl.Skiplist) error
type bufferShardBucket struct {
mutable *skl.Skiplist
writeCh chan operation
flushCh chan flushEvent
writeWaitGroup *sync.WaitGroup
flushWaitGroup *sync.WaitGroup
log *logger.Logger
wal wal.WAL
immutables []*skl.Skiplist
labelValues []string
shardLabelValues []string
index int
capacity int
mutex sync.RWMutex
walSyncMode bool
enableWal bool
}
// Buffer is an exported struct that represents a buffer composed of multiple shard buckets.
type Buffer struct {
onFlushFn sync.Map
entryCloser *run.Closer
log *logger.Logger
buckets []bufferShardBucket
writeWaitGroup sync.WaitGroup
flushWaitGroup sync.WaitGroup
numShards int
enableWal bool
closerOnce sync.Once
}
// NewBuffer creates a new Buffer instance with the given parameters.
func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int) (*Buffer, error) {
return NewBufferWithWal(log, position, flushSize, writeConcurrency, numShards, false, "")
}
// NewBufferWithWal creates a new Buffer instance with the given parameters.
func NewBufferWithWal(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int, enableWal bool, walPath string,
) (*Buffer, error) {
buckets := make([]bufferShardBucket, numShards)
buffer := &Buffer{
buckets: buckets,
numShards: numShards,
entryCloser: run.NewCloser(1),
log: log.Named("buffer"),
enableWal: enableWal,
}
buffer.writeWaitGroup.Add(numShards)
buffer.flushWaitGroup.Add(numShards)
for i := 0; i < numShards; i++ {
buckets[i] = bufferShardBucket{
index: i,
capacity: flushSize,
mutable: skl.NewSkiplist(int64(flushSize)),
writeCh: make(chan operation, writeConcurrency),
flushCh: make(chan flushEvent, 1),
writeWaitGroup: &buffer.writeWaitGroup,
flushWaitGroup: &buffer.flushWaitGroup,
log: buffer.log.Named(fmt.Sprintf("shard-%d", i)),
labelValues: append(position.LabelValues(), fmt.Sprintf("%d", i)),
shardLabelValues: position.ShardLabelValues(),
enableWal: enableWal,
}
buckets[i].start(buffer.flushers)
if enableWal {
if walPath == "" {
return nil, errors.New("wal path is required")
}
shardWalPath := fmt.Sprintf("%s/buffer-%d", walPath, i)
if err := buckets[i].startWal(shardWalPath, defaultWalSyncMode); err != nil {
return nil, errors.Wrap(err, "failed to start wal")
}
}
maxBytes.Set(float64(flushSize), buckets[i].labelValues...)
}
return buffer, nil
}
// Register registers a callback function that will be called when a shard bucket is flushed.
func (b *Buffer) Register(id string, onFlushFn onFlush) {
b.onFlushFn.LoadOrStore(id, onFlushFn)
}
// Unregister unregisters a callback function that will be called when a shard bucket is flushed.
func (b *Buffer) Unregister(id string) {
b.onFlushFn.Delete(id)
}
func (b *Buffer) flushers() []onFlush {
var flushers []onFlush
b.onFlushFn.Range(func(key, value interface{}) bool {
flushers = append(flushers, value.(onFlush))
return true
})
return flushers
}
func (b *Buffer) isEmpty() bool {
return len(b.flushers()) == 0
}
// Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer.
func (b *Buffer) Write(key, value []byte, timestamp time.Time) error {
if b == nil || !b.entryCloser.AddRunning() {
return errors.New("buffer is invalid")
}
defer b.entryCloser.Done()
index := b.getShardIndex(key)
if b.log.Debug().Enabled() {
b.log.Debug().Uint64("shard", index).Bytes("key", key).
Time("ts", timestamp).Msg("route a shard")
}
if b.enableWal {
if err := b.buckets[index].writeWal(key, value, timestamp); err != nil {
return errors.Wrap(err, "failed to write wal")
}
}
b.buckets[index].writeCh <- operation{key: key, value: value, epoch: uint64(timestamp.UnixNano())}
return nil
}
// Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.
func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
if b == nil || !b.entryCloser.AddRunning() {
return nil, false
}
defer b.entryCloser.Done()
keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
index := b.getShardIndex(key)
epoch := uint64(ts.UnixNano())
ll, deferFn := b.buckets[index].getAll()
defer deferFn()
for _, bk := range ll {
value := bk.Get(keyWithTS)
if value.Meta == 0 && value.Value == nil {
continue
}
if value.Version == epoch {
return value.Value, true
}
}
return nil, false
}
// Close gracefully closes the Buffer and ensures that all pending operations are completed.
func (b *Buffer) Close() error {
if b == nil {
return nil
}
b.closerOnce.Do(func() {
b.entryCloser.Done()
b.entryCloser.CloseThenWait()
for i := 0; i < b.numShards; i++ {
close(b.buckets[i].writeCh)
}
b.writeWaitGroup.Wait()
for i := 0; i < b.numShards; i++ {
ff := b.flushers()
for _, fn := range ff {
if err := fn(i, b.buckets[i].mutable); err != nil {
b.buckets[i].log.Err(err).Msg("flushing mutable buffer failed")
}
}
b.buckets[i].mutable.DecrRef()
}
for i := 0; i < b.numShards; i++ {
close(b.buckets[i].flushCh)
if b.enableWal {
if err := b.buckets[i].wal.Close(); err != nil {
b.buckets[i].log.Err(err).Msg("closing buffer shard wal failed")
}
}
}
b.flushWaitGroup.Wait()
})
return nil
}
func (b *Buffer) getShardIndex(key []byte) uint64 {
return convert.Hash(key) % uint64(b.numShards)
}
func (bsb *bufferShardBucket) getAll() ([]*skl.Skiplist, func()) {
bsb.mutex.RLock()
defer bsb.mutex.RUnlock()
allList := make([]*skl.Skiplist, len(bsb.immutables)+1)
bsb.mutable.IncrRef()
allList[0] = bsb.mutable
last := len(bsb.immutables) - 1
for i := range bsb.immutables {
allList[i+1] = bsb.immutables[last-i]
bsb.immutables[last-i].IncrRef()
}
return allList, func() {
for _, l := range allList {
l.DecrRef()
}
}
}
func (bsb *bufferShardBucket) start(flushers func() []onFlush) {
go func() {
defer func() {
for _, g := range []meter.Gauge{maxBytes, mutableBytes} {
g.Delete(bsb.labelValues...)
}
}()
defer bsb.flushWaitGroup.Done()
for event := range bsb.flushCh {
oldSkipList := event.data
memSize := oldSkipList.MemSize()
onFlushFnDone := false
t1 := time.Now()
ff := flushers()
for {
if !onFlushFnDone {
failedFns := make([]onFlush, 0)
for i := 0; i < len(ff); i++ {
fn := ff[i]
if err := fn(bsb.index, oldSkipList); err != nil {
bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...")
failedFns = append(failedFns, fn)
}
}
if len(failedFns) > 0 {
flushNum.Inc(1, append(bsb.labelValues[:2], "true")...)
time.Sleep(time.Second)
ff = failedFns
continue
}
onFlushFnDone = true
}
if bsb.enableWal {
if err := bsb.wal.Delete(event.walSegmentID); err != nil {
bsb.log.Err(err).Msg("delete wal segment file failed. Retrying...")
time.Sleep(time.Second)
continue
}
}
break
}
flushLatency.Observe(time.Since(t1).Seconds(), bsb.shardLabelValues...)
flushBytes.Inc(float64(memSize), bsb.shardLabelValues...)
flushNum.Inc(1, append(bsb.shardLabelValues, "false")...)
bsb.mutex.Lock()
if len(bsb.immutables) > 0 {
bsb.immutables = bsb.immutables[1:]
}
bsb.mutex.Unlock()
oldSkipList.DecrRef()
}
}()
go func() {
defer bsb.writeWaitGroup.Done()
volume := 0
for op := range bsb.writeCh {
k := y.KeyWithTs(op.key, op.epoch)
v := y.ValueStruct{Value: op.value}
volume += len(k) + int(v.EncodedSize()) + skl.MaxNodeSize + nodeAlign
memSize := bsb.mutable.MemSize()
mutableBytes.Set(float64(memSize), bsb.labelValues...)
if op.recoveryDoneFn == nil && (volume >= bsb.capacity || memSize >= int64(bsb.capacity)) {
if err := bsb.triggerFlushing(); err != nil {
bsb.log.Err(err).Msg("triggering flushing failed")
} else {
volume = 0
}
}
bsb.mutable.Put(k, v)
if bsb.enableWal && op.recoveryDoneFn != nil {
op.recoveryDoneFn()
}
}
}()
}
func (bsb *bufferShardBucket) triggerFlushing() error {
var walSegmentID wal.SegmentID
if bsb.enableWal {
segment, err := bsb.wal.Rotate()
if err != nil {
return errors.Wrap(err, "rotating wal failed")
}
walSegmentID = segment.GetSegmentID()
}
for {
select {
case bsb.flushCh <- flushEvent{data: bsb.mutable, walSegmentID: walSegmentID}:
bsb.mutex.Lock()
defer bsb.mutex.Unlock()
bsb.swap()
return nil
default:
}
time.Sleep(10 * time.Second)
}
}
func (bsb *bufferShardBucket) swap() {
bsb.immutables = append(bsb.immutables, bsb.mutable)
bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
}
func (bsb *bufferShardBucket) startWal(path string, syncMode bool) error {
wal, err := wal.New(path, wal.DefaultOptions)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to create wal: %s", path))
}
bsb.wal = wal
bsb.walSyncMode = syncMode
bsb.log.Info().Msg(fmt.Sprintf(
"wal started with path: %s, sync mode: %v", path, syncMode))
if err = bsb.recoveryWal(); err != nil {
return errors.Wrap(err, "failed to recovery wal")
}
return nil
}
func (bsb *bufferShardBucket) recoveryWal() error {
segments, err := bsb.wal.ReadAllSegments()
if err != nil {
return errors.Wrap(err, "failed to load wal segments")
}
recoveredRecords := 0
for index, segment := range segments {
recoveredRecords += len(segment.GetLogEntries())
isWorkSegment := index == len(segments)-1
if isWorkSegment {
bsb.recoveryWorkSegment(segment)
} else {
bsb.recoveryStableSegment(segment)
}
bsb.log.Info().Msg(fmt.Sprintf(
"recovered %d log records from wal segment %d",
len(segment.GetLogEntries()),
segment.GetSegmentID()))
}
bsb.log.Info().Msg(fmt.Sprintf(
"recovered %d log records from wal", recoveredRecords))
return nil
}
func (bsb *bufferShardBucket) recoveryWorkSegment(segment wal.Segment) {
var wg sync.WaitGroup
for _, logEntry := range segment.GetLogEntries() {
timestamps := logEntry.GetTimestamps()
values := logEntry.GetValues()
elementIndex := 0
for element := values.Front(); element != nil; element = element.Next() {
timestamp := timestamps[elementIndex]
wg.Add(1)
bsb.writeCh <- operation{
key: logEntry.GetSeriesID(),
value: element.Value.([]byte),
epoch: uint64(timestamp.UnixNano()),
recoveryDoneFn: func() {
wg.Done()
if bsb.log.Trace().Enabled() {
bsb.log.Trace().Msg(fmt.Sprintf("recovered key: %v, ts: %v",
logEntry.GetSeriesID(), timestamp.UnixNano()))
}
},
}
elementIndex++
}
}
wg.Wait()
}
func (bsb *bufferShardBucket) recoveryStableSegment(segment wal.Segment) {
for _, logEntries := range segment.GetLogEntries() {
timestamps := logEntries.GetTimestamps()
values := logEntries.GetValues()
elementIndex := 0
for element := values.Front(); element != nil; element = element.Next() {
timestamp := timestamps[elementIndex]
k := y.KeyWithTs(logEntries.GetSeriesID(), uint64(timestamp.UnixNano()))
v := y.ValueStruct{Value: element.Value.([]byte)}
bsb.mutable.Put(k, v)
elementIndex++
}
}
bsb.flushCh <- flushEvent{data: bsb.mutable, walSegmentID: segment.GetSegmentID()}
// Sync recover data to immutables
bsb.swap()
}
func (bsb *bufferShardBucket) writeWal(key, value []byte, timestamp time.Time) error {
if !bsb.walSyncMode {
bsb.wal.Write(key, timestamp, value, nil)
return nil
}
var walErr error
var wg sync.WaitGroup
wg.Add(1)
walCallback := func(key []byte, t time.Time, value []byte, err error) {
walErr = err
wg.Done()
}
bsb.wal.Write(key, timestamp, value, walCallback)
wg.Wait()
return walErr
}
// BufferSupplier lends a Buffer to a caller and returns it when the caller is done with it.
type BufferSupplier struct {
l *logger.Logger
p common.Position
buffers sync.Map
path string
writeConcurrency int
numShards int
enableWAL bool
}
// NewBufferSupplier creates a new BufferSupplier instance with the given parameters.
func NewBufferSupplier(l *logger.Logger, p common.Position, writeConcurrency, numShards int, enableWAL bool, path string) *BufferSupplier {
return &BufferSupplier{
l: l.Named("buffer-supplier"),
p: p,
writeConcurrency: writeConcurrency,
numShards: numShards,
enableWAL: enableWAL,
path: path,
}
}
// Borrow borrows a Buffer from the BufferSupplier.
func (b *BufferSupplier) Borrow(bufferName, name string, bufferSize int, onFlushFn onFlush) (buffer *Buffer, err error) {
if bufferName == "" || name == "" {
return nil, errors.New("bufferName and name are required")
}
if onFlushFn == nil {
return nil, errors.New("onFlushFn is required")
}
defer func() {
if buffer != nil {
buffer.Register(name, onFlushFn)
}
}()
if v, ok := b.buffers.Load(bufferName); ok {
buffer = v.(*Buffer)
return v.(*Buffer), nil
}
if buffer, err = NewBufferWithWal(b.l.Named("buffer-"+bufferName), b.p,
bufferSize, b.writeConcurrency, b.numShards, b.enableWAL, b.path); err != nil {
return nil, err
}
if v, ok := b.buffers.LoadOrStore(bufferName, buffer); ok {
_ = buffer.Close()
buffer = v.(*Buffer)
return buffer, nil
}
return buffer, nil
}
// Return returns a Buffer to the BufferSupplier.
func (b *BufferSupplier) Return(bufferName, name string) {
if v, ok := b.buffers.Load(bufferName); ok {
buffer := v.(*Buffer)
buffer.Unregister(name)
if buffer.isEmpty() {
b.buffers.Delete(bufferName)
_ = buffer.Close()
}
}
}
// Volume returns the number of Buffers in the BufferSupplier.
func (b *BufferSupplier) Volume() int {
volume := 0
b.buffers.Range(func(key, value interface{}) bool {
volume++
return true
})
return volume
}
// Close closes all Buffers in the BufferSupplier.
func (b *BufferSupplier) Close() error {
b.buffers.Range(func(key, value interface{}) bool {
buffer := value.(*Buffer)
_ = buffer.Close()
return true
})
return nil
}