blob: 7e29d0b411c9036ab99862b473e1aa59ddbc5583 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"time"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
var (
_ encoding.SeriesEncoderPool = (*encoderPool)(nil)
_ encoding.SeriesDecoderPool = (*decoderPool)(nil)
intervalFn = func(key []byte) time.Duration {
_, interval, err := decodeFieldFlag(key)
if err != nil {
panic(err)
}
return interval
}
)
type encoderPool struct {
intPool encoding.SeriesEncoderPool
defaultPool encoding.SeriesEncoderPool
l *logger.Logger
}
func newEncoderPool(plainSize, intSize int, l *logger.Logger) encoding.SeriesEncoderPool {
return &encoderPool{
intPool: encoding.NewIntEncoderPool(intSize, intervalFn),
defaultPool: encoding.NewPlainEncoderPool(plainSize),
l: l,
}
}
func (p *encoderPool) Get(metadata []byte) encoding.SeriesEncoder {
fieldSpec, _, err := decodeFieldFlag(metadata)
if err != nil {
p.l.Err(err).Msg("failed to decode field flag")
return p.defaultPool.Get(metadata)
}
if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
return p.intPool.Get(metadata)
}
return p.defaultPool.Get(metadata)
}
func (p *encoderPool) Put(encoder encoding.SeriesEncoder) {
p.intPool.Put(encoder)
p.defaultPool.Put(encoder)
}
type decoderPool struct {
intPool encoding.SeriesDecoderPool
defaultPool encoding.SeriesDecoderPool
l *logger.Logger
}
func newDecoderPool(plainSize, intSize int, l *logger.Logger) encoding.SeriesDecoderPool {
return &decoderPool{
intPool: encoding.NewIntDecoderPool(intSize, intervalFn),
defaultPool: encoding.NewPlainDecoderPool(plainSize),
l: l,
}
}
func (p *decoderPool) Get(metadata []byte) encoding.SeriesDecoder {
fieldSpec, _, err := decodeFieldFlag(metadata)
if err != nil {
p.l.Err(err).Msg("failed to decode field flag")
return p.defaultPool.Get(metadata)
}
if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
return p.intPool.Get(metadata)
}
return p.defaultPool.Get(metadata)
}
func (p *decoderPool) Put(decoder encoding.SeriesDecoder) {
p.intPool.Put(decoder)
p.defaultPool.Put(decoder)
}
const fieldFlagLength = 9
func encoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) []byte {
encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
bb := make([]byte, fieldFlagLength)
bb[0] = encodingMethod<<4 | compressionMethod
copy(bb[1:], convert.Int64ToBytes(int64(interval)))
return bb
}
func decodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) {
if len(key) < fieldFlagLength {
return nil, 0, ErrMalformedFieldFlag
}
b := key[len(key)-9:]
return &databasev1.FieldSpec{
EncodingMethod: databasev1.EncodingMethod(int32(b[0]) >> 4),
CompressionMethod: databasev1.CompressionMethod((int32(b[0] & 0x0F))),
}, time.Duration(convert.BytesToInt64(b[1:])), nil
}