blob: 51f16039d76a9a73cef2caee7f84f66d7678d4c7 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package trace
import (
"context"
"os"
"path"
"sort"
"strings"
"testing"
"time"
googleUUID "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
"github.com/apache/skywalking-banyandb/pkg/pb"
)
var _ sort.Interface = (ByEntityID)(nil)
type ByEntityID []data.Entity
func (b ByEntityID) Len() int {
return len(b)
}
func (b ByEntityID) Less(i, j int) bool {
return strings.Compare(b[i].GetEntityId(), b[j].GetEntityId()) < 0
}
func (b ByEntityID) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}
func setup(t *testing.T) (*traceSeries, func()) {
_ = logger.Bootstrap()
db, err := storage.NewDB(context.TODO(), nil)
assert.NoError(t, err)
uuid, err := googleUUID.NewUUID()
assert.NoError(t, err)
rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
assert.NoError(t, db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
svc, err := NewService(context.TODO(), db, nil)
assert.NoError(t, err)
assert.NoError(t, svc.PreRun())
assert.NoError(t, db.PreRun())
traceSVC := svc.(*service)
ts, err := traceSVC.getSeries(common.Metadata{
Spec: &v1.Metadata{
Name: "sw",
Group: "default",
},
})
assert.NoError(t, err)
return ts, func() {
db.GracefulStop()
_ = os.RemoveAll(rootPath)
}
}
type seriesEntity struct {
seriesID string
entity entity
}
type wantEntity struct {
entityID string
dataBinary []byte
fieldsSize int
}
var interval = 500 * time.Millisecond
func testData(t time.Time) []seriesEntity {
return []seriesEntity{
{
seriesID: "webapp_10.0.0.1",
entity: getEntityWithTS("1", []byte{11}, t,
"trace_id-xxfff.111323",
0,
"webapp_id",
"10.0.0.1_id",
"/home_id",
"webapp",
"10.0.0.1",
"/home",
300,
1622933202000000000,
),
},
{
seriesID: "gateway_10.0.0.2",
entity: getEntityWithTS("2", []byte{12}, t.Add(interval),
"trace_id-xxfff.111323a",
1,
),
},
{
seriesID: "httpserver_10.0.0.3",
entity: getEntityWithTS("3", []byte{13}, t.Add(interval*2),
"trace_id-xxfff.111323",
1,
"httpserver_id",
"10.0.0.3_id",
"/home_id",
"httpserver",
"10.0.0.3",
"/home",
300,
1622933202000000000,
"GET",
"200",
),
},
{
seriesID: "database_10.0.0.4",
entity: getEntityWithTS("4", []byte{14}, t.Add(interval*3),
"trace_id-xxfff.111323",
0,
"database_id",
"10.0.0.4_id",
"/home_id",
"database",
"10.0.0.4",
"/home",
300,
1622933202000000000,
nil,
nil,
"MySQL",
"10.1.1.4",
),
},
{
seriesID: "mq_10.0.0.5",
entity: getEntityWithTS("5", []byte{15}, t.Add(interval*4),
"trace_id-zzpp.111323",
0,
"mq_id",
"10.0.0.5_id",
"/home_id",
"mq",
"10.0.0.5",
"/home",
300,
1622933202000000000,
nil,
nil,
nil,
nil,
"test_topic",
"10.0.0.5",
),
},
{
seriesID: "database_10.0.0.6",
entity: getEntityWithTS("6", []byte{16}, t.Add(interval*5),
"trace_id-zzpp.111323",
1,
"database_id",
"10.0.0.6_id",
"/home_id",
"database",
"10.0.0.6",
"/home",
300,
1622933202000000000,
nil,
nil,
"MySQL",
"10.1.1.6",
),
},
{
seriesID: "mq_10.0.0.7",
entity: getEntityWithTS("7", []byte{17}, t.Add(interval*6),
"trace_id-zzpp.111323",
0,
"nq_id",
"10.0.0.7_id",
"/home_id",
"mq",
"10.0.0.7",
"/home",
300,
1622933202000000000,
nil,
nil,
nil,
nil,
"test_topic",
"10.0.0.7",
),
},
}
}
type entity struct {
id string
binary []byte
t time.Time
items []interface{}
}
func getEntity(id string, binary []byte, items ...interface{}) entity {
return entity{
id: id,
binary: binary,
items: items,
}
}
func getEntityWithTS(id string, binary []byte, t time.Time, items ...interface{}) entity {
return entity{
id: id,
binary: binary,
t: t,
items: items,
}
}
func setupTestData(t *testing.T, ts *traceSeries, seriesEntities []seriesEntity) (results []idWithShard) {
results = make([]idWithShard, 0, len(seriesEntities))
for _, se := range seriesEntities {
seriesID := []byte(se.seriesID)
ev := pb.NewEntityValueBuilder().
DataBinary(se.entity.binary).
EntityID(se.entity.id).
Timestamp(se.entity.t).
Fields(se.entity.items...).
Build()
shardID := partition.ShardID(seriesID, 2)
got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), shardID, data.EntityValue{
EntityValue: ev,
})
if err != nil {
t.Error("Write() got error")
}
if got < 1 {
t.Error("Write() got empty chunkID")
}
results = append(results, idWithShard{
id: got,
shardID: shardID,
})
}
return results
}