blob: 7e923767e115956c979defab61be08495bbdf3ba [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package schema
import (
"context"
"io"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
type EventType uint8
const (
EventAddOrUpdate EventType = iota
EventDelete
)
type EventKind uint8
const (
EventKindGroup EventKind = iota
EventKindResource
)
type Group interface {
GetSchema() *commonv1.Group
StoreResource(resourceSchema ResourceSchema) (Resource, error)
LoadResource(name string) (Resource, bool)
}
type MetadataEvent struct {
Typ EventType
Kind EventKind
Metadata *commonv1.Metadata
}
type ResourceSchema interface {
GetMetadata() *commonv1.Metadata
}
type ResourceSpec struct {
Schema ResourceSchema
IndexRules []*databasev1.IndexRule
}
type Resource interface {
GetIndexRules() []*databasev1.IndexRule
MaxObservedModRevision() int64
EntityLocator() partition.EntityLocator
ResourceSchema
io.Closer
}
type ResourceSupplier interface {
OpenResource(shardNum uint32, db tsdb.Supplier, spec ResourceSpec) (Resource, error)
ResourceSchema(repo metadata.Repo, metdata *commonv1.Metadata) (ResourceSchema, error)
OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error)
}
type Repository interface {
Watcher()
SendMetadataEvent(MetadataEvent)
StoreGroup(groupMeta *commonv1.Metadata) (*group, error)
LoadGroup(name string) (Group, bool)
LoadResource(metadata *commonv1.Metadata) (Resource, bool)
NotifyAll() (err error)
Close()
}
var _ Repository = (*schemaRepo)(nil)
type schemaRepo struct {
sync.RWMutex
metadata metadata.Repo
repo discovery.ServiceRepo
l *logger.Logger
resourceSupplier ResourceSupplier
shardTopic bus.Topic
entityTopic bus.Topic
data map[string]*group
// stop channel for the inner worker
workerStopCh chan struct{}
eventCh chan MetadataEvent
}
func NewRepository(
metadata metadata.Repo,
repo discovery.ServiceRepo,
l *logger.Logger,
resourceSupplier ResourceSupplier,
shardTopic bus.Topic,
entityTopic bus.Topic,
) Repository {
return &schemaRepo{
metadata: metadata,
repo: repo,
l: l,
resourceSupplier: resourceSupplier,
shardTopic: shardTopic,
entityTopic: entityTopic,
data: make(map[string]*group),
eventCh: make(chan MetadataEvent),
workerStopCh: make(chan struct{}),
}
}
func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) {
sr.eventCh <- event
}
func (sr *schemaRepo) Watcher() {
defer func() {
if err := recover(); err != nil {
sr.l.Warn().Interface("err", err).Msg("watching the events")
}
}()
for {
select {
case evt, more := <-sr.eventCh:
if !more {
return
}
sr.l.Info().Interface("event", evt).Msg("received an event")
for i := 0; i < 10; i++ {
var err error
switch evt.Typ {
case EventAddOrUpdate:
switch evt.Kind {
case EventKindGroup:
_, err = sr.StoreGroup(evt.Metadata)
case EventKindResource:
_, err = sr.storeResource(evt.Metadata)
}
case EventDelete:
switch evt.Kind {
case EventKindGroup:
err = sr.deleteGroup(evt.Metadata)
case EventKindResource:
err = sr.deleteResource(evt.Metadata)
}
}
if err == nil {
break
}
time.Sleep(time.Second)
sr.l.Err(err).Interface("event", evt).Int("round", i).Msg("fail to handle the metadata event. retry...")
}
case <-sr.workerStopCh:
return
}
}
}
func (sr *schemaRepo) StoreGroup(groupMeta *commonv1.Metadata) (*group, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
groupSchema, err := sr.metadata.GroupRegistry().GetGroup(ctx, groupMeta.GetName())
cancel()
if err != nil {
return nil, err
}
name := groupSchema.GetMetadata().GetName()
sr.Lock()
defer sr.Unlock()
g, ok := sr.getGroup(name)
if !ok {
sr.l.Info().Str("group", name).Msg("creating a tsdb")
var db tsdb.Database
db, err = sr.resourceSupplier.OpenDB(groupSchema)
if err != nil {
return nil, err
}
g = newGroup(groupSchema, sr.repo, sr.metadata, db, sr.l, sr.resourceSupplier, sr.entityTopic)
sr.data[name] = g
return g, sr.notify(groupSchema, databasev1.Action_ACTION_PUT)
}
prevGroupSchema := g.groupSchema
if groupSchema.GetMetadata().GetModRevision() <= prevGroupSchema.Metadata.ModRevision {
return g, nil
}
sr.l.Info().Str("group", name).Msg("closing the previous tsdb")
db := g.SupplyTSDB()
db.Close()
err = sr.notify(prevGroupSchema, databasev1.Action_ACTION_DELETE)
if err != nil {
return nil, err
}
sr.l.Info().Str("group", name).Msg("creating a new tsdb")
newDB, err := sr.resourceSupplier.OpenDB(groupSchema)
if err != nil {
return nil, err
}
g.setDB(newDB)
err = sr.notify(groupSchema, databasev1.Action_ACTION_PUT)
if err != nil {
return nil, err
}
g.groupSchema = groupSchema
return g, nil
}
func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error {
name := groupMeta.GetName()
sr.Lock()
defer sr.Unlock()
var ok bool
g, ok := sr.getGroup(name)
if !ok {
return nil
}
err := g.close()
if err != nil {
return err
}
_ = sr.notify(g.groupSchema, databasev1.Action_ACTION_DELETE)
delete(sr.data, name)
return nil
}
func (sr *schemaRepo) getGroup(name string) (*group, bool) {
g := sr.data[name]
if g == nil {
return nil, false
}
return g, true
}
func (sr *schemaRepo) LoadGroup(name string) (Group, bool) {
sr.RLock()
defer sr.RUnlock()
return sr.getGroup(name)
}
func (sr *schemaRepo) LoadResource(metadata *commonv1.Metadata) (Resource, bool) {
g, ok := sr.LoadGroup(metadata.Group)
if !ok {
return nil, false
}
return g.LoadResource(metadata.Name)
}
func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) (Resource, error) {
group, ok := sr.LoadGroup(metadata.Group)
if !ok {
return nil, errors.Errorf("unknown group")
}
stm, err := sr.resourceSupplier.ResourceSchema(sr.metadata, metadata)
if err != nil {
return nil, errors.Errorf("fails to get the resource")
}
return group.StoreResource(stm)
}
func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) error {
g, ok := sr.LoadGroup(metadata.Group)
if !ok {
return nil
}
return g.(*group).deleteResource(metadata)
}
func (sr *schemaRepo) notify(groupSchema *commonv1.Group, action databasev1.Action) (err error) {
now := time.Now()
nowPb := timestamppb.New(now)
shardNum := groupSchema.GetResourceOpts().GetShardNum()
for i := 0; i < int(shardNum); i++ {
_, errInternal := sr.repo.Publish(sr.shardTopic, bus.NewMessage(bus.MessageID(now.UnixNano()), &databasev1.ShardEvent{
Shard: &databasev1.Shard{
Id: uint64(i),
Total: shardNum,
Metadata: &commonv1.Metadata{
Name: groupSchema.GetMetadata().GetName(),
},
Node: &databasev1.Node{
Id: sr.repo.NodeID(),
CreatedAt: nowPb,
UpdatedAt: nowPb,
Addr: "localhost",
},
UpdatedAt: nowPb,
CreatedAt: nowPb,
},
Time: nowPb,
Action: action,
}))
if errors.Is(errInternal, bus.ErrTopicNotExist) {
return nil
}
if errInternal != nil {
err = multierr.Append(err, errInternal)
}
}
return err
}
func (sr *schemaRepo) NotifyAll() (err error) {
sr.RLock()
defer sr.RUnlock()
for _, g := range sr.data {
err = multierr.Append(err, sr.notify(g.groupSchema, databasev1.Action_ACTION_PUT))
g.mapMutex.RLock()
for _, s := range g.schemaMap {
err = multierr.Append(err, g.notify(s, databasev1.Action_ACTION_PUT))
}
g.mapMutex.RUnlock()
}
return err
}
func (sr *schemaRepo) Close() {
defer func() {
if err := recover(); err != nil {
sr.l.Warn().Interface("err", err).Msg("closing resource")
}
}()
if sr.eventCh != nil {
close(sr.eventCh)
}
if sr.workerStopCh != nil {
close(sr.workerStopCh)
}
sr.RLock()
defer sr.RUnlock()
for _, g := range sr.data {
err := g.close()
if err != nil {
sr.l.Err(err).Stringer("group", g.groupSchema.Metadata).Msg("closing")
}
}
}
var _ Group = (*group)(nil)
type group struct {
groupSchema *commonv1.Group
l *logger.Logger
resourceSupplier ResourceSupplier
repo discovery.ServiceRepo
metadata metadata.Repo
entityTopic bus.Topic
db atomic.Value
mapMutex sync.RWMutex
schemaMap map[string]Resource
}
func newGroup(
groupSchema *commonv1.Group,
repo discovery.ServiceRepo,
metadata metadata.Repo,
db tsdb.Database,
l *logger.Logger,
resourceSupplier ResourceSupplier,
entityTopic bus.Topic,
) *group {
g := &group{
groupSchema: groupSchema,
repo: repo,
metadata: metadata,
l: l,
schemaMap: make(map[string]Resource),
resourceSupplier: resourceSupplier,
entityTopic: entityTopic,
}
g.db.Store(db)
return g
}
func (g *group) GetSchema() *commonv1.Group {
return g.groupSchema
}
func (g *group) SupplyTSDB() tsdb.Database {
return g.db.Load().(tsdb.Database)
}
func (g *group) setDB(db tsdb.Database) {
g.db.Store(db)
}
func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) {
g.mapMutex.Lock()
defer g.mapMutex.Unlock()
key := resourceSchema.GetMetadata().GetName()
preResource := g.schemaMap[key]
if preResource != nil &&
resourceSchema.GetMetadata().GetModRevision() <= preResource.GetMetadata().GetModRevision() {
// we only need to check the max modifications revision observed for index rules
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
idxRules, errIndexRules := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata())
cancel()
if errIndexRules != nil {
return nil, errIndexRules
}
if len(idxRules) == len(preResource.GetIndexRules()) {
maxModRevision := pbv1.ParseMaxModRevision(idxRules)
if preResource.MaxObservedModRevision() >= maxModRevision {
return preResource, nil
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
idxRules, errIndexRules := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata())
cancel()
if errIndexRules != nil {
return nil, errIndexRules
}
sm, errTS := g.resourceSupplier.OpenResource(g.groupSchema.GetResourceOpts().ShardNum, g, ResourceSpec{
Schema: resourceSchema,
IndexRules: idxRules,
})
if errTS != nil {
return nil, errTS
}
if err := g.notify(sm, databasev1.Action_ACTION_PUT); err != nil {
return nil, err
}
g.schemaMap[key] = sm
if preResource != nil {
_ = preResource.Close()
}
return sm, nil
}
func (g *group) deleteResource(metadata *commonv1.Metadata) error {
g.mapMutex.Lock()
defer g.mapMutex.Unlock()
key := metadata.GetName()
preResource := g.schemaMap[key]
if preResource == nil {
return nil
}
if err := g.notify(preResource, databasev1.Action_ACTION_DELETE); err != nil {
return err
}
delete(g.schemaMap, key)
_ = preResource.Close()
return nil
}
func (g *group) LoadResource(name string) (Resource, bool) {
g.mapMutex.RLock()
s := g.schemaMap[name]
g.mapMutex.RUnlock()
if s == nil {
return nil, false
}
return s, true
}
func (g *group) notify(resource Resource, action databasev1.Action) error {
now := time.Now()
nowPb := timestamppb.New(now)
entityLocator := resource.EntityLocator()
locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(entityLocator))
for _, tagLocator := range entityLocator {
locator = append(locator, &databasev1.EntityEvent_TagLocator{
FamilyOffset: uint32(tagLocator.FamilyOffset),
TagOffset: uint32(tagLocator.TagOffset),
})
}
_, err := g.repo.Publish(g.entityTopic, bus.NewMessage(bus.MessageID(now.UnixNano()), &databasev1.EntityEvent{
Subject: resource.GetMetadata(),
EntityLocator: locator,
Time: nowPb,
Action: action,
}))
if errors.Is(err, bus.ErrTopicNotExist) {
return nil
}
return err
}
func (g *group) close() (err error) {
g.mapMutex.RLock()
for _, s := range g.schemaMap {
err = multierr.Append(err, s.Close())
}
g.mapMutex.RUnlock()
return multierr.Append(err, g.SupplyTSDB().Close())
}