| // Licensed to Apache Software Foundation (ASF) under one or more contributor |
| // license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright |
| // ownership. Apache Software Foundation (ASF) licenses this file to you under |
| // the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package schema |
| |
| import ( |
| "context" |
| "io" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "github.com/pkg/errors" |
| "go.uber.org/multierr" |
| "google.golang.org/protobuf/types/known/timestamppb" |
| |
| commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" |
| databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" |
| "github.com/apache/skywalking-banyandb/banyand/discovery" |
| "github.com/apache/skywalking-banyandb/banyand/metadata" |
| "github.com/apache/skywalking-banyandb/banyand/tsdb" |
| "github.com/apache/skywalking-banyandb/pkg/bus" |
| "github.com/apache/skywalking-banyandb/pkg/logger" |
| "github.com/apache/skywalking-banyandb/pkg/partition" |
| pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" |
| ) |
| |
| type EventType uint8 |
| |
| const ( |
| EventAddOrUpdate EventType = iota |
| EventDelete |
| ) |
| |
| type EventKind uint8 |
| |
| const ( |
| EventKindGroup EventKind = iota |
| EventKindResource |
| ) |
| |
| type Group interface { |
| GetSchema() *commonv1.Group |
| StoreResource(resourceSchema ResourceSchema) (Resource, error) |
| LoadResource(name string) (Resource, bool) |
| } |
| |
| type MetadataEvent struct { |
| Typ EventType |
| Kind EventKind |
| Metadata *commonv1.Metadata |
| } |
| |
| type ResourceSchema interface { |
| GetMetadata() *commonv1.Metadata |
| } |
| |
| type ResourceSpec struct { |
| Schema ResourceSchema |
| IndexRules []*databasev1.IndexRule |
| } |
| |
| type Resource interface { |
| GetIndexRules() []*databasev1.IndexRule |
| MaxObservedModRevision() int64 |
| EntityLocator() partition.EntityLocator |
| ResourceSchema |
| io.Closer |
| } |
| |
| type ResourceSupplier interface { |
| OpenResource(shardNum uint32, db tsdb.Supplier, spec ResourceSpec) (Resource, error) |
| ResourceSchema(repo metadata.Repo, metdata *commonv1.Metadata) (ResourceSchema, error) |
| OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) |
| } |
| |
| type Repository interface { |
| Watcher() |
| SendMetadataEvent(MetadataEvent) |
| StoreGroup(groupMeta *commonv1.Metadata) (*group, error) |
| LoadGroup(name string) (Group, bool) |
| LoadResource(metadata *commonv1.Metadata) (Resource, bool) |
| NotifyAll() (err error) |
| Close() |
| } |
| |
| var _ Repository = (*schemaRepo)(nil) |
| |
| type schemaRepo struct { |
| sync.RWMutex |
| metadata metadata.Repo |
| repo discovery.ServiceRepo |
| l *logger.Logger |
| resourceSupplier ResourceSupplier |
| shardTopic bus.Topic |
| entityTopic bus.Topic |
| data map[string]*group |
| |
| // stop channel for the inner worker |
| workerStopCh chan struct{} |
| eventCh chan MetadataEvent |
| } |
| |
| func NewRepository( |
| metadata metadata.Repo, |
| repo discovery.ServiceRepo, |
| l *logger.Logger, |
| resourceSupplier ResourceSupplier, |
| shardTopic bus.Topic, |
| entityTopic bus.Topic, |
| ) Repository { |
| return &schemaRepo{ |
| metadata: metadata, |
| repo: repo, |
| l: l, |
| resourceSupplier: resourceSupplier, |
| shardTopic: shardTopic, |
| entityTopic: entityTopic, |
| data: make(map[string]*group), |
| eventCh: make(chan MetadataEvent), |
| workerStopCh: make(chan struct{}), |
| } |
| } |
| |
| func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) { |
| sr.eventCh <- event |
| } |
| |
| func (sr *schemaRepo) Watcher() { |
| defer func() { |
| if err := recover(); err != nil { |
| sr.l.Warn().Interface("err", err).Msg("watching the events") |
| } |
| }() |
| for { |
| select { |
| case evt, more := <-sr.eventCh: |
| if !more { |
| return |
| } |
| sr.l.Info().Interface("event", evt).Msg("received an event") |
| for i := 0; i < 10; i++ { |
| var err error |
| switch evt.Typ { |
| case EventAddOrUpdate: |
| switch evt.Kind { |
| case EventKindGroup: |
| _, err = sr.StoreGroup(evt.Metadata) |
| case EventKindResource: |
| _, err = sr.storeResource(evt.Metadata) |
| } |
| case EventDelete: |
| switch evt.Kind { |
| case EventKindGroup: |
| err = sr.deleteGroup(evt.Metadata) |
| case EventKindResource: |
| err = sr.deleteResource(evt.Metadata) |
| } |
| } |
| if err == nil { |
| break |
| } |
| time.Sleep(time.Second) |
| sr.l.Err(err).Interface("event", evt).Int("round", i).Msg("fail to handle the metadata event. retry...") |
| } |
| case <-sr.workerStopCh: |
| return |
| } |
| } |
| } |
| |
| func (sr *schemaRepo) StoreGroup(groupMeta *commonv1.Metadata) (*group, error) { |
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| groupSchema, err := sr.metadata.GroupRegistry().GetGroup(ctx, groupMeta.GetName()) |
| cancel() |
| if err != nil { |
| return nil, err |
| } |
| name := groupSchema.GetMetadata().GetName() |
| sr.Lock() |
| defer sr.Unlock() |
| g, ok := sr.getGroup(name) |
| if !ok { |
| sr.l.Info().Str("group", name).Msg("creating a tsdb") |
| var db tsdb.Database |
| db, err = sr.resourceSupplier.OpenDB(groupSchema) |
| if err != nil { |
| return nil, err |
| } |
| g = newGroup(groupSchema, sr.repo, sr.metadata, db, sr.l, sr.resourceSupplier, sr.entityTopic) |
| sr.data[name] = g |
| return g, sr.notify(groupSchema, databasev1.Action_ACTION_PUT) |
| } |
| prevGroupSchema := g.groupSchema |
| if groupSchema.GetMetadata().GetModRevision() <= prevGroupSchema.Metadata.ModRevision { |
| return g, nil |
| } |
| sr.l.Info().Str("group", name).Msg("closing the previous tsdb") |
| db := g.SupplyTSDB() |
| db.Close() |
| err = sr.notify(prevGroupSchema, databasev1.Action_ACTION_DELETE) |
| if err != nil { |
| return nil, err |
| } |
| sr.l.Info().Str("group", name).Msg("creating a new tsdb") |
| newDB, err := sr.resourceSupplier.OpenDB(groupSchema) |
| if err != nil { |
| return nil, err |
| } |
| g.setDB(newDB) |
| err = sr.notify(groupSchema, databasev1.Action_ACTION_PUT) |
| if err != nil { |
| return nil, err |
| } |
| g.groupSchema = groupSchema |
| return g, nil |
| } |
| |
| func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error { |
| name := groupMeta.GetName() |
| sr.Lock() |
| defer sr.Unlock() |
| var ok bool |
| g, ok := sr.getGroup(name) |
| if !ok { |
| return nil |
| } |
| err := g.close() |
| if err != nil { |
| return err |
| } |
| _ = sr.notify(g.groupSchema, databasev1.Action_ACTION_DELETE) |
| delete(sr.data, name) |
| return nil |
| } |
| |
| func (sr *schemaRepo) getGroup(name string) (*group, bool) { |
| g := sr.data[name] |
| if g == nil { |
| return nil, false |
| } |
| return g, true |
| } |
| |
| func (sr *schemaRepo) LoadGroup(name string) (Group, bool) { |
| sr.RLock() |
| defer sr.RUnlock() |
| return sr.getGroup(name) |
| } |
| |
| func (sr *schemaRepo) LoadResource(metadata *commonv1.Metadata) (Resource, bool) { |
| g, ok := sr.LoadGroup(metadata.Group) |
| if !ok { |
| return nil, false |
| } |
| return g.LoadResource(metadata.Name) |
| } |
| |
| func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) (Resource, error) { |
| group, ok := sr.LoadGroup(metadata.Group) |
| if !ok { |
| return nil, errors.Errorf("unknown group") |
| } |
| stm, err := sr.resourceSupplier.ResourceSchema(sr.metadata, metadata) |
| if err != nil { |
| return nil, errors.Errorf("fails to get the resource") |
| } |
| return group.StoreResource(stm) |
| } |
| |
| func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) error { |
| g, ok := sr.LoadGroup(metadata.Group) |
| if !ok { |
| return nil |
| } |
| return g.(*group).deleteResource(metadata) |
| } |
| |
| func (sr *schemaRepo) notify(groupSchema *commonv1.Group, action databasev1.Action) (err error) { |
| now := time.Now() |
| nowPb := timestamppb.New(now) |
| shardNum := groupSchema.GetResourceOpts().GetShardNum() |
| for i := 0; i < int(shardNum); i++ { |
| _, errInternal := sr.repo.Publish(sr.shardTopic, bus.NewMessage(bus.MessageID(now.UnixNano()), &databasev1.ShardEvent{ |
| Shard: &databasev1.Shard{ |
| Id: uint64(i), |
| Total: shardNum, |
| Metadata: &commonv1.Metadata{ |
| Name: groupSchema.GetMetadata().GetName(), |
| }, |
| Node: &databasev1.Node{ |
| Id: sr.repo.NodeID(), |
| CreatedAt: nowPb, |
| UpdatedAt: nowPb, |
| Addr: "localhost", |
| }, |
| UpdatedAt: nowPb, |
| CreatedAt: nowPb, |
| }, |
| Time: nowPb, |
| Action: action, |
| })) |
| if errors.Is(errInternal, bus.ErrTopicNotExist) { |
| return nil |
| } |
| if errInternal != nil { |
| err = multierr.Append(err, errInternal) |
| } |
| } |
| return err |
| } |
| |
| func (sr *schemaRepo) NotifyAll() (err error) { |
| sr.RLock() |
| defer sr.RUnlock() |
| for _, g := range sr.data { |
| err = multierr.Append(err, sr.notify(g.groupSchema, databasev1.Action_ACTION_PUT)) |
| g.mapMutex.RLock() |
| for _, s := range g.schemaMap { |
| err = multierr.Append(err, g.notify(s, databasev1.Action_ACTION_PUT)) |
| } |
| g.mapMutex.RUnlock() |
| } |
| return err |
| } |
| |
| func (sr *schemaRepo) Close() { |
| defer func() { |
| if err := recover(); err != nil { |
| sr.l.Warn().Interface("err", err).Msg("closing resource") |
| } |
| }() |
| if sr.eventCh != nil { |
| close(sr.eventCh) |
| } |
| if sr.workerStopCh != nil { |
| close(sr.workerStopCh) |
| } |
| |
| sr.RLock() |
| defer sr.RUnlock() |
| for _, g := range sr.data { |
| err := g.close() |
| if err != nil { |
| sr.l.Err(err).Stringer("group", g.groupSchema.Metadata).Msg("closing") |
| } |
| } |
| } |
| |
| var _ Group = (*group)(nil) |
| |
| type group struct { |
| groupSchema *commonv1.Group |
| l *logger.Logger |
| resourceSupplier ResourceSupplier |
| repo discovery.ServiceRepo |
| metadata metadata.Repo |
| entityTopic bus.Topic |
| db atomic.Value |
| mapMutex sync.RWMutex |
| schemaMap map[string]Resource |
| } |
| |
| func newGroup( |
| groupSchema *commonv1.Group, |
| repo discovery.ServiceRepo, |
| metadata metadata.Repo, |
| db tsdb.Database, |
| l *logger.Logger, |
| resourceSupplier ResourceSupplier, |
| entityTopic bus.Topic, |
| ) *group { |
| g := &group{ |
| groupSchema: groupSchema, |
| repo: repo, |
| metadata: metadata, |
| l: l, |
| schemaMap: make(map[string]Resource), |
| resourceSupplier: resourceSupplier, |
| entityTopic: entityTopic, |
| } |
| g.db.Store(db) |
| return g |
| } |
| |
| func (g *group) GetSchema() *commonv1.Group { |
| return g.groupSchema |
| } |
| |
| func (g *group) SupplyTSDB() tsdb.Database { |
| return g.db.Load().(tsdb.Database) |
| } |
| |
| func (g *group) setDB(db tsdb.Database) { |
| g.db.Store(db) |
| } |
| |
| func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) { |
| g.mapMutex.Lock() |
| defer g.mapMutex.Unlock() |
| key := resourceSchema.GetMetadata().GetName() |
| preResource := g.schemaMap[key] |
| if preResource != nil && |
| resourceSchema.GetMetadata().GetModRevision() <= preResource.GetMetadata().GetModRevision() { |
| // we only need to check the max modifications revision observed for index rules |
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| idxRules, errIndexRules := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata()) |
| cancel() |
| if errIndexRules != nil { |
| return nil, errIndexRules |
| } |
| if len(idxRules) == len(preResource.GetIndexRules()) { |
| maxModRevision := pbv1.ParseMaxModRevision(idxRules) |
| if preResource.MaxObservedModRevision() >= maxModRevision { |
| return preResource, nil |
| } |
| } |
| } |
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| idxRules, errIndexRules := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata()) |
| cancel() |
| if errIndexRules != nil { |
| return nil, errIndexRules |
| } |
| sm, errTS := g.resourceSupplier.OpenResource(g.groupSchema.GetResourceOpts().ShardNum, g, ResourceSpec{ |
| Schema: resourceSchema, |
| IndexRules: idxRules, |
| }) |
| if errTS != nil { |
| return nil, errTS |
| } |
| if err := g.notify(sm, databasev1.Action_ACTION_PUT); err != nil { |
| return nil, err |
| } |
| g.schemaMap[key] = sm |
| if preResource != nil { |
| _ = preResource.Close() |
| } |
| return sm, nil |
| } |
| |
| func (g *group) deleteResource(metadata *commonv1.Metadata) error { |
| g.mapMutex.Lock() |
| defer g.mapMutex.Unlock() |
| key := metadata.GetName() |
| preResource := g.schemaMap[key] |
| if preResource == nil { |
| return nil |
| } |
| if err := g.notify(preResource, databasev1.Action_ACTION_DELETE); err != nil { |
| return err |
| } |
| delete(g.schemaMap, key) |
| _ = preResource.Close() |
| return nil |
| } |
| |
| func (g *group) LoadResource(name string) (Resource, bool) { |
| g.mapMutex.RLock() |
| s := g.schemaMap[name] |
| g.mapMutex.RUnlock() |
| if s == nil { |
| return nil, false |
| } |
| return s, true |
| } |
| |
| func (g *group) notify(resource Resource, action databasev1.Action) error { |
| now := time.Now() |
| nowPb := timestamppb.New(now) |
| entityLocator := resource.EntityLocator() |
| locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(entityLocator)) |
| for _, tagLocator := range entityLocator { |
| locator = append(locator, &databasev1.EntityEvent_TagLocator{ |
| FamilyOffset: uint32(tagLocator.FamilyOffset), |
| TagOffset: uint32(tagLocator.TagOffset), |
| }) |
| } |
| _, err := g.repo.Publish(g.entityTopic, bus.NewMessage(bus.MessageID(now.UnixNano()), &databasev1.EntityEvent{ |
| Subject: resource.GetMetadata(), |
| EntityLocator: locator, |
| Time: nowPb, |
| Action: action, |
| })) |
| if errors.Is(err, bus.ErrTopicNotExist) { |
| return nil |
| } |
| return err |
| } |
| |
| func (g *group) close() (err error) { |
| g.mapMutex.RLock() |
| for _, s := range g.schemaMap { |
| err = multierr.Append(err, s.Close()) |
| } |
| g.mapMutex.RUnlock() |
| return multierr.Append(err, g.SupplyTSDB().Close()) |
| } |