blob: 453548e1cf95dbf15d512edb7fa4abef9a30892e [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package tsdb
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var ErrEndOfSegment = errors.New("reached the end of the segment")
type segment struct {
id uint16
path string
suffix string
globalIndex kv.Store
l *logger.Logger
timestamp.TimeRange
bucket.Reporter
blockController *blockController
blockManageStrategy *bucket.Strategy
}
func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
segmentSize, blockSize IntervalRule, blockQueue bucket.Queue,
) (s *segment, err error) {
suffixInteger, err := strconv.Atoi(suffix)
if err != nil {
return nil, err
}
id := GenerateInternalID(segmentSize.Unit, suffixInteger)
timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
l := logger.Fetch(ctx, "segment")
segCtx := context.WithValue(ctx, logger.ContextKey, l)
clock, segCtx := timestamp.GetClock(segCtx)
s = &segment{
id: id,
path: path,
suffix: suffix,
l: l,
blockController: newBlockController(segCtx, id, path, timeRange, blockSize, l, blockQueue),
TimeRange: timeRange,
Reporter: bucket.NewTimeBasedReporter(timeRange, clock),
}
err = s.blockController.open()
if err != nil {
return nil, err
}
indexPath, err := mkdir(globalIndexTemplate, path)
if err != nil {
return nil, err
}
o := ctx.Value(optionsKey)
if o != nil {
options := o.(DatabaseOpts)
if options.EnableGlobalIndex {
memSize := options.GlobalIndexMemSize
if memSize == 0 {
memSize = defaultKVMemorySize
}
if s.globalIndex, err = kv.OpenStore(
0,
indexPath,
kv.StoreWithLogger(s.l),
kv.StoreWithMemTableSize(memSize),
); err != nil {
return nil, err
}
}
}
s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l))
if err != nil {
return nil, err
}
s.blockManageStrategy.Run()
return s, nil
}
func (s *segment) close() {
s.blockController.close()
if s.globalIndex != nil {
s.globalIndex.Close()
}
s.Stop()
}
func (s *segment) closeBlock(id uint16) {
s.blockController.closeBlock(id)
}
func (s segment) String() string {
return fmt.Sprintf("SegID-%d", s.id)
}
func (s *segment) Stats() observability.Statistics {
if s.globalIndex == nil {
return observability.Statistics{}
}
return s.globalIndex.Stats()
}
type blockController struct {
sync.RWMutex
segCtx context.Context
segID uint16
location string
segTimeRange timestamp.TimeRange
blockSize IntervalRule
lst []*block
blockQueue bucket.Queue
clock timestamp.Clock
l *logger.Logger
}
func newBlockController(segCtx context.Context, segID uint16, location string, segTimeRange timestamp.TimeRange,
blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue,
) *blockController {
clock, _ := timestamp.GetClock(segCtx)
return &blockController{
segCtx: segCtx,
segID: segID,
location: location,
blockSize: blockSize,
segTimeRange: segTimeRange,
blockQueue: blockQueue,
l: l,
clock: clock,
}
}
func (bc *blockController) Current() bucket.Reporter {
bc.RLock()
defer bc.RUnlock()
now := bc.clock.Now()
for _, s := range bc.lst {
if s.suffix == bc.Format(now) {
return s
}
}
// return the latest segment before now
if len(bc.lst) > 0 {
return bc.lst[len(bc.lst)-1]
}
return nil
}
func (bc *blockController) Next() (bucket.Reporter, error) {
b := bc.Current().(*block)
reporter, err := bc.create(
bc.blockSize.NextTime(b.Start))
if errors.Is(err, ErrEndOfSegment) {
return nil, bucket.ErrNoMoreBucket
}
if err != nil {
return nil, err
}
return reporter, err
}
func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
event := bc.l.Info()
if prev != nil {
event.Stringer("prev", prev)
bc.blockQueue.Push(BlockID{
SegID: bc.segID,
BlockID: prev.(*block).blockID,
})
}
if next != nil {
event.Stringer("next", next)
}
event.Msg("move to the next block")
}
func (bc *blockController) Format(tm time.Time) string {
switch bc.blockSize.Unit {
case HOUR:
return tm.Format(blockHourFormat)
case DAY:
return tm.Format(blockDayFormat)
}
panic("invalid interval unit")
}
func (bc *blockController) Parse(value string) (time.Time, error) {
switch bc.blockSize.Unit {
case HOUR:
return time.ParseInLocation(blockHourFormat, value, time.Local)
case DAY:
return time.ParseInLocation(blockDayFormat, value, time.Local)
}
panic("invalid interval unit")
}
func (bc *blockController) span(timeRange timestamp.TimeRange) ([]blockDelegate, error) {
bb := bc.search(func(b *block) bool {
return b.Overlapping(timeRange)
})
if bb == nil {
return nil, nil
}
dd := make([]blockDelegate, len(bb))
for i, b := range bb {
d, err := b.delegate()
if err != nil {
return nil, err
}
dd[i] = d
}
return dd, nil
}
func (bc *blockController) get(blockID uint16) (blockDelegate, error) {
b := bc.getBlock(blockID)
if b != nil {
return b.delegate()
}
return nil, nil
}
func (bc *blockController) getBlock(blockID uint16) *block {
bb := bc.search(func(b *block) bool {
return b.blockID == blockID
})
if len(bb) > 0 {
return bb[0]
}
return nil
}
func (bc *blockController) blocks() []*block {
bc.RLock()
defer bc.RUnlock()
r := make([]*block, len(bc.lst))
copy(r, bc.lst)
return r
}
func (bc *blockController) search(matcher func(*block) bool) (bb []*block) {
snapshotLst := func() []*block {
bc.RLock()
defer bc.RUnlock()
return bc.lst
}
lst := snapshotLst()
last := len(lst) - 1
for i := range lst {
b := lst[last-i]
if matcher(b) {
bb = append(bb, b)
}
}
return bb
}
func (bc *blockController) closeBlock(blockID uint16) {
bc.RLock()
defer bc.RUnlock()
b := bc.getBlock(blockID)
if b == nil {
return
}
b.close()
}
func (bc *blockController) startTime(suffix string) (time.Time, error) {
t, err := bc.Parse(suffix)
if err != nil {
return time.Time{}, err
}
startTime := bc.segTimeRange.Start
switch bc.blockSize.Unit {
case HOUR:
return time.Date(startTime.Year(), startTime.Month(),
startTime.Day(), t.Hour(), 0, 0, 0, startTime.Location()), nil
case DAY:
return time.Date(startTime.Year(), startTime.Month(),
t.Day(), t.Hour(), 0, 0, 0, startTime.Location()), nil
}
panic("invalid interval unit")
}
func (bc *blockController) open() error {
err := WalkDir(
bc.location,
segPathPrefix,
func(suffix, absolutePath string) error {
_, err := bc.load(suffix, absolutePath)
return err
})
if err != nil {
return err
}
if bc.Current() == nil {
_, err := bc.create(bc.clock.Now())
if err != nil {
return err
}
}
return nil
}
func (bc *blockController) create(startTime time.Time) (*block, error) {
if startTime.Before(bc.segTimeRange.Start) {
startTime = bc.segTimeRange.Start
}
if !startTime.Before(bc.segTimeRange.End) {
return nil, ErrEndOfSegment
}
suffix := bc.Format(startTime)
segPath, err := mkdir(blockTemplate, bc.location, suffix)
if err != nil {
return nil, err
}
return bc.load(suffix, segPath)
}
func (bc *blockController) load(suffix, path string) (b *block, err error) {
starTime, err := bc.startTime(suffix)
if err != nil {
return nil, err
}
if b, err = newBlock(
common.SetPosition(bc.segCtx, func(p common.Position) common.Position {
p.Block = suffix
return p
}),
blockOpts{
segID: bc.segID,
path: path,
startTime: starTime,
suffix: suffix,
blockSize: bc.blockSize,
queue: bc.blockQueue,
}); err != nil {
return nil, err
}
bc.Lock()
defer bc.Unlock()
bc.lst = append(bc.lst, b)
return b, nil
}
func (bc *blockController) close() {
for _, s := range bc.lst {
s.close()
}
}