blob: 3799195525ae6c14366fdef8ad8ed2390540f49c [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package storage
import (
"context"
"fmt"
"os"
"path"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var testSeriesPool pbv1.SeriesPool
func TestSeriesIndex_Primary(t *testing.T) {
ctx := context.Background()
path, fn := setUp(require.New(t))
si, err := newSeriesIndex(ctx, path, time.Now(), 0)
require.NoError(t, err)
defer func() {
require.NoError(t, si.Close())
fn()
}()
var docs index.Documents
for i := 0; i < 100; i++ {
series := testSeriesPool.Generate()
series.Subject = "service_instance_latency"
series.EntityValues = []*modelv1.TagValue{
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: fmt.Sprintf("svc_%d", i)}}},
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: fmt.Sprintf("svc_%d_instance_%d", i, i)}}},
}
// Initialize test data
if err = series.Marshal(); err != nil {
t.Fatalf("Failed to marshal series: %v", err)
}
require.True(t, series.ID > 0)
doc := index.Document{
DocID: uint64(series.ID),
EntityValues: make([]byte, len(series.Buffer)),
}
copy(doc.EntityValues, series.Buffer)
docs = append(docs, doc)
testSeriesPool.Release(series)
}
require.NoError(t, si.Write(docs))
// Restart the index
require.NoError(t, si.Close())
si, err = newSeriesIndex(ctx, path, time.Now(), 0)
require.NoError(t, err)
tests := []struct {
name string
subject string
entityValues []*modelv1.TagValue
expected []*modelv1.TagValue
}{
{
name: "Search",
subject: "service_instance_latency",
entityValues: []*modelv1.TagValue{
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}},
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}},
},
expected: []*modelv1.TagValue{
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}},
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}},
},
},
{
name: "Prefix",
subject: "service_instance_latency",
entityValues: []*modelv1.TagValue{
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}},
pbv1.AnyTagValue,
},
expected: []*modelv1.TagValue{
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}},
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}},
},
},
{
name: "Wildcard",
subject: "service_instance_latency",
entityValues: []*modelv1.TagValue{
pbv1.AnyTagValue,
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}},
},
expected: []*modelv1.TagValue{
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}},
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
seriesQuery := testSeriesPool.Generate()
defer testSeriesPool.Release(seriesQuery)
seriesQuery.Subject = tt.subject
seriesQuery.EntityValues = tt.entityValues
sl, err := si.searchPrimary(ctx, seriesQuery)
require.NoError(t, err)
require.Equal(t, 1, len(sl))
assert.Equal(t, tt.subject, sl[0].Subject)
assert.Equal(t, tt.expected[0].GetStr().GetValue(), sl[0].EntityValues[0].GetStr().GetValue())
assert.Equal(t, tt.expected[1].GetStr().GetValue(), sl[0].EntityValues[1].GetStr().GetValue())
assert.True(t, sl[0].ID > 0)
})
}
}
func setUp(t *require.Assertions) (tempDir string, deferFunc func()) {
t.NoError(logger.Init(logger.Logging{
Env: "dev",
Level: flags.LogLevel,
}))
tempDir, deferFunc = test.Space(t)
return tempDir, deferFunc
}
func TestSeriesIndexController(t *testing.T) {
ttl := IntervalRule{
Unit: DAY,
Num: 3,
}
t.Run("Test setup", func(t *testing.T) {
ctx := context.Background()
tmpDir, dfFn, err := test.NewSpace()
require.NoError(t, err)
defer dfFn()
opts := TSDBOpts[TSTable, any]{
Location: tmpDir,
TTL: ttl,
}
sic, err := newSeriesIndexController(ctx, opts)
assert.NoError(t, err)
assert.NotNil(t, sic)
idxNames := make([]string, 0)
walkDir(tmpDir, "idx-", func(suffix string) error {
idxNames = append(idxNames, suffix)
return nil
})
assert.Equal(t, 1, len(idxNames))
require.NoError(t, sic.Close())
sic, err = newSeriesIndexController(ctx, opts)
assert.NoError(t, err)
assert.NotNil(t, sic)
idxNames = idxNames[:0]
walkDir(tmpDir, "idx-", func(suffix string) error {
idxNames = append(idxNames, suffix)
return nil
})
assert.Equal(t, 1, len(idxNames))
require.NoError(t, sic.Close())
require.NoError(t, os.MkdirAll(path.Join(tmpDir, fmt.Sprintf("idx-%016x", time.Now().UnixNano()-20000)), 0o755))
require.NoError(t, os.MkdirAll(path.Join(tmpDir, fmt.Sprintf("idx-%016x", time.Now().UnixNano()-10000)), 0o755))
sic, err = newSeriesIndexController(ctx, opts)
assert.NoError(t, err)
assert.NotNil(t, sic)
idxNames = idxNames[:0]
walkDir(tmpDir, "idx-", func(suffix string) error {
idxNames = append(idxNames, suffix)
return nil
})
assert.Equal(t, 2, len(idxNames))
require.NoError(t, sic.Close())
})
t.Run("Test retention", func(t *testing.T) {
scenarios := []struct {
name string
now string
}{
{"more than one hour before a new day", "2024-04-24 22:30:00"},
{"more than one hour after a new day", "2024-04-25 01:30:00"},
{"equal one hour after a new day", "2024-04-25 01:00:00"},
{"less one hour after a new day", "2024-04-25 00:50:00"},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
ctx := context.Background()
c := timestamp.NewMockClock()
now, err := time.ParseInLocation("2006-01-02 15:04:05", scenario.now, time.Local)
require.NoError(t, err)
c.Set(now)
ctx = timestamp.SetClock(ctx, c)
tmpDir, dfFn, err := test.NewSpace()
require.NoError(t, err)
defer dfFn()
opts := TSDBOpts[TSTable, any]{
Location: tmpDir,
TTL: ttl,
}
sic, err := newSeriesIndexController(ctx, opts)
require.NoError(t, err)
defer sic.Close()
c.Set(now.Add(-time.Hour*23 + 10*time.Minute))
require.NoError(t, sic.run(c.Now()))
sic.RLock()
standby := sic.standby
sic.RUnlock()
require.NotNil(t, standby)
idxNames := make([]string, 0)
walkDir(tmpDir, "idx-", func(suffix string) error {
idxNames = append(idxNames, suffix)
return nil
})
assert.Equal(t, 2, len(idxNames))
nextTime := standby.startTime
c.Set(now.Add(time.Hour))
require.NoError(t, sic.run(c.Now()))
sic.RLock()
standby = sic.standby
hot := sic.hot
sic.RUnlock()
require.Nil(t, standby)
assert.Equal(t, nextTime, hot.startTime)
})
}
scenarios = []struct {
name string
now string
}{
{"less one hour before a new day", "2024-04-24 23:10:00"},
{"equal one hour before a new day", "2024-04-24 23:00:00"},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
ctx := context.Background()
c := timestamp.NewMockClock()
now, err := time.ParseInLocation("2006-01-02 15:04:05", scenario.now, time.Local)
require.NoError(t, err)
c.Set(now)
ctx = timestamp.SetClock(ctx, c)
tmpDir, dfFn, err := test.NewSpace()
require.NoError(t, err)
defer dfFn()
opts := TSDBOpts[TSTable, any]{
Location: tmpDir,
TTL: ttl,
}
sic, err := newSeriesIndexController(ctx, opts)
require.NoError(t, err)
defer sic.Close()
c.Set(now.Add(-time.Hour*23 + 10*time.Minute))
require.NoError(t, sic.run(c.Now()))
sic.RLock()
standby := sic.standby
sic.RUnlock()
require.Nil(t, standby)
idxNames := make([]string, 0)
walkDir(tmpDir, "idx-", func(suffix string) error {
idxNames = append(idxNames, suffix)
return nil
})
assert.Equal(t, 1, len(idxNames))
})
}
})
}