| // Licensed to Apache Software Foundation (ASF) under one or more contributor |
| // license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright |
| // ownership. Apache Software Foundation (ASF) licenses this file to you under |
| // the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package tsdb |
| |
| import ( |
| "context" |
| "fmt" |
| "io" |
| "os" |
| "strconv" |
| "strings" |
| "sync" |
| |
| "github.com/pkg/errors" |
| "go.uber.org/multierr" |
| |
| "github.com/apache/skywalking-banyandb/api/common" |
| "github.com/apache/skywalking-banyandb/pkg/encoding" |
| "github.com/apache/skywalking-banyandb/pkg/logger" |
| "github.com/apache/skywalking-banyandb/pkg/timestamp" |
| ) |
| |
| const ( |
| shardPathPrefix = "shard" |
| pathSeparator = string(os.PathSeparator) |
| rootPrefix = "%s" + pathSeparator |
| shardTemplate = rootPrefix + shardPathPrefix + "-%d" |
| seriesTemplate = rootPrefix + "series" |
| segPathPrefix = "seg" |
| segTemplate = rootPrefix + segPathPrefix + "-%s" |
| blockPathPrefix = "block" |
| blockTemplate = rootPrefix + blockPathPrefix + "-%s" |
| globalIndexTemplate = rootPrefix + "index" |
| |
| segHourFormat = "2006010215" |
| segDayFormat = "20060102" |
| blockHourFormat = "15" |
| blockDayFormat = "0102" |
| |
| dirPerm = 0o700 |
| ) |
| |
| var ( |
| ErrInvalidShardID = errors.New("invalid shard id") |
| ErrOpenDatabase = errors.New("fails to open the database") |
| |
| encodingMethodKey = contextEncodingMethodKey{} |
| ) |
| |
| type contextEncodingMethodKey struct{} |
| |
| type Supplier interface { |
| SupplyTSDB() Database |
| } |
| type Database interface { |
| io.Closer |
| Shards() []Shard |
| Shard(id common.ShardID) (Shard, error) |
| } |
| |
| type Shard interface { |
| io.Closer |
| ID() common.ShardID |
| Series() SeriesDatabase |
| Index() IndexDatabase |
| State() ShardState |
| } |
| |
| var _ Database = (*database)(nil) |
| |
| type DatabaseOpts struct { |
| Location string |
| ShardNum uint32 |
| EncodingMethod EncodingMethod |
| SegmentSize IntervalRule |
| BlockSize IntervalRule |
| } |
| |
| type EncodingMethod struct { |
| EncoderPool encoding.SeriesEncoderPool |
| DecoderPool encoding.SeriesDecoderPool |
| } |
| |
| type BlockID struct { |
| SegID uint16 |
| BlockID uint16 |
| } |
| |
| func GenerateInternalID(unit IntervalUnit, suffix int) uint16 { |
| return uint16(unit)<<12 | ((uint16(suffix) << 4) >> 4) |
| } |
| |
| type BlockState struct { |
| ID BlockID |
| TimeRange timestamp.TimeRange |
| Closed bool |
| } |
| type ShardState struct { |
| Blocks []BlockState |
| OpenBlocks []BlockID |
| } |
| |
| type database struct { |
| logger *logger.Logger |
| location string |
| shardNum uint32 |
| segmentSize IntervalRule |
| blockSize IntervalRule |
| |
| sLst []Shard |
| sync.Mutex |
| } |
| |
| func (d *database) Shards() []Shard { |
| return d.sLst |
| } |
| |
| func (d *database) Shard(id common.ShardID) (Shard, error) { |
| if uint(id) >= uint(len(d.sLst)) { |
| return nil, ErrInvalidShardID |
| } |
| return d.sLst[id], nil |
| } |
| |
| func (d *database) Close() error { |
| var err error |
| for _, s := range d.sLst { |
| innerErr := s.Close() |
| if innerErr != nil { |
| err = multierr.Append(err, innerErr) |
| } |
| } |
| return err |
| } |
| |
| func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { |
| if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil { |
| return nil, errors.Wrap(ErrOpenDatabase, "encoding method is absent") |
| } |
| if _, err := mkdir(opts.Location); err != nil { |
| return nil, err |
| } |
| segmentSize := opts.SegmentSize |
| if segmentSize.Num == 0 { |
| segmentSize = IntervalRule{ |
| Unit: DAY, |
| Num: 1, |
| } |
| } |
| blockSize := opts.BlockSize |
| if blockSize.Num == 0 { |
| blockSize = IntervalRule{ |
| Unit: HOUR, |
| Num: 2, |
| } |
| } |
| if blockSize.EstimatedDuration() > segmentSize.EstimatedDuration() { |
| return nil, errors.Wrapf(ErrOpenDatabase, "the block size is bigger than the segment size") |
| } |
| db := &database{ |
| location: opts.Location, |
| shardNum: opts.ShardNum, |
| logger: logger.Fetch(ctx, "tsdb"), |
| segmentSize: segmentSize, |
| blockSize: blockSize, |
| } |
| db.logger.Info().Str("path", opts.Location).Msg("initialized") |
| var entries []os.DirEntry |
| var err error |
| if entries, err = os.ReadDir(opts.Location); err != nil { |
| return nil, errors.Wrap(err, "failed to read directory contents failed") |
| } |
| thisContext := context.WithValue(ctx, logger.ContextKey, db.logger) |
| thisContext = context.WithValue(thisContext, encodingMethodKey, opts.EncodingMethod) |
| if len(entries) > 0 { |
| return loadDatabase(thisContext, db) |
| } |
| return initDatabase(thisContext, db) |
| } |
| |
| func initDatabase(ctx context.Context, db *database) (Database, error) { |
| db.Lock() |
| defer db.Unlock() |
| return createDatabase(ctx, db, 0) |
| } |
| |
| func createDatabase(ctx context.Context, db *database, startID int) (Database, error) { |
| var err error |
| for i := startID; i < int(db.shardNum); i++ { |
| db.logger.Info().Int("shard_id", i).Msg("creating a shard") |
| so, errNewShard := OpenShard(ctx, common.ShardID(i), |
| db.location, db.segmentSize, db.blockSize, defaultBlockQueueSize) |
| if errNewShard != nil { |
| err = multierr.Append(err, errNewShard) |
| continue |
| } |
| db.sLst = append(db.sLst, so) |
| } |
| return db, err |
| } |
| |
| func loadDatabase(ctx context.Context, db *database) (Database, error) { |
| // TODO: open the lock file |
| // TODO: open the manifest file |
| db.Lock() |
| defer db.Unlock() |
| err := WalkDir(db.location, shardPathPrefix, func(suffix, _ string) error { |
| shardID, err := strconv.Atoi(suffix) |
| if err != nil { |
| return err |
| } |
| if shardID >= int(db.shardNum) { |
| return nil |
| } |
| db.logger.Info().Int("shard_id", shardID).Msg("opening a existing shard") |
| so, errOpenShard := OpenShard( |
| context.WithValue(ctx, logger.ContextKey, db.logger), |
| common.ShardID(shardID), |
| db.location, |
| db.segmentSize, |
| db.blockSize, |
| defaultBlockQueueSize, |
| ) |
| if errOpenShard != nil { |
| return errOpenShard |
| } |
| db.sLst = append(db.sLst, so) |
| return nil |
| }) |
| if err != nil { |
| return nil, errors.WithMessage(err, "load the database failed") |
| } |
| |
| loadedShardsNum := len(db.sLst) |
| if loadedShardsNum < int(db.shardNum) { |
| _, err := createDatabase(ctx, db, loadedShardsNum) |
| if err != nil { |
| return nil, errors.WithMessage(err, "load the database failed") |
| } |
| |
| } |
| return db, nil |
| } |
| |
| type WalkFn func(suffix, absolutePath string) error |
| |
| func WalkDir(root, prefix string, walkFn WalkFn) error { |
| files, err := os.ReadDir(root) |
| if err != nil { |
| return errors.Wrapf(err, "failed to walk the database path: %s", root) |
| } |
| for _, f := range files { |
| if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) { |
| continue |
| } |
| segs := strings.Split(f.Name(), "-") |
| errWalk := walkFn(segs[len(segs)-1], fmt.Sprintf(rootPrefix, root)+f.Name()) |
| if errWalk != nil { |
| return errors.WithMessagef(errWalk, "failed to load: %s", f.Name()) |
| } |
| } |
| return nil |
| } |
| |
| func mkdir(format string, a ...interface{}) (path string, err error) { |
| path = fmt.Sprintf(format, a...) |
| if err = os.MkdirAll(path, dirPerm); err != nil { |
| return "", errors.Wrapf(err, "failed to create %s", path) |
| } |
| return path, err |
| } |