blob: e7c27cfd2c5cbfb156e31c33dbf957dad996a791 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package memory
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
)
import (
"github.com/pkg/errors"
)
import (
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
"github.com/apache/dubbo-kubernetes/pkg/events"
)
type resourceKey struct {
Name string
Mesh string
ResourceType string
}
type memoryStoreRecord struct {
resourceKey
Version memoryVersion
Spec string
CreationTime time.Time
ModificationTime time.Time
Children []*resourceKey
Labels map[string]string
}
type memoryStoreRecords = []*memoryStoreRecord
var _ core_model.ResourceMeta = &memoryMeta{}
type memoryMeta struct {
Name string
Mesh string
Version memoryVersion
CreationTime time.Time
ModificationTime time.Time
Labels map[string]string
}
func (m memoryMeta) GetName() string {
return m.Name
}
func (m memoryMeta) GetNameExtensions() core_model.ResourceNameExtensions {
return core_model.ResourceNameExtensionsUnsupported
}
func (m memoryMeta) GetMesh() string {
return m.Mesh
}
func (m memoryMeta) GetVersion() string {
return m.Version.String()
}
func (m memoryMeta) GetCreationTime() time.Time {
return m.CreationTime
}
func (m memoryMeta) GetModificationTime() time.Time {
return m.ModificationTime
}
func (m memoryMeta) GetLabels() map[string]string {
return m.Labels
}
type memoryVersion uint64
func initialVersion() memoryVersion {
return memoryVersion(1)
}
func (v memoryVersion) Next() memoryVersion {
return memoryVersion(uint64(v) + 1)
}
func (v memoryVersion) String() string {
return strconv.FormatUint(uint64(v), 10)
}
var _ store.ResourceStore = &memoryStore{}
type memoryStore struct {
records memoryStoreRecords
mu sync.RWMutex
eventWriter events.Emitter
}
func NewStore() store.ResourceStore {
return &memoryStore{}
}
func (c *memoryStore) SetEventWriter(writer events.Emitter) {
c.mu.Lock()
defer c.mu.Unlock()
c.eventWriter = writer
}
func (c *memoryStore) Create(_ context.Context, r core_model.Resource, fs ...store.CreateOptionsFunc) error {
c.mu.Lock()
defer c.mu.Unlock()
opts := store.NewCreateOptions(fs...)
// Name must be provided via CreateOptions
if opts.Name == "" && opts.Mesh == "" {
return errors.New("you must pass store.CreateBy or store.CreateByKey as a parameter")
}
if _, record := c.findRecord(string(r.Descriptor().Name), opts.Name, opts.Mesh); record != nil {
return store.ErrorResourceAlreadyExists(r.Descriptor().Name, opts.Name, opts.Mesh)
}
meta := memoryMeta{
Name: opts.Name,
Mesh: opts.Mesh,
Version: initialVersion(),
CreationTime: opts.CreationTime,
ModificationTime: opts.CreationTime,
Labels: opts.Labels,
}
// fill the meta
r.SetMeta(meta)
// convert into storage representation
record, err := c.marshalRecord(
string(r.Descriptor().Name),
meta,
r.GetSpec())
if err != nil {
return err
}
if opts.Owner != nil {
_, ownerRecord := c.findRecord(string(opts.Owner.Descriptor().Name), opts.Owner.GetMeta().GetName(), opts.Owner.GetMeta().GetMesh())
if ownerRecord == nil {
return store.ErrorResourceNotFound(opts.Owner.Descriptor().Name, opts.Owner.GetMeta().GetName(), opts.Owner.GetMeta().GetMesh())
}
ownerRecord.Children = append(ownerRecord.Children, &record.resourceKey)
}
// persist
c.records = append(c.records, record)
if c.eventWriter != nil {
go func() {
c.eventWriter.Send(events.ResourceChangedEvent{
Operation: events.Create,
Type: r.Descriptor().Name,
Key: core_model.MetaToResourceKey(r.GetMeta()),
})
}()
}
return nil
}
func (c *memoryStore) Update(_ context.Context, r core_model.Resource, fs ...store.UpdateOptionsFunc) error {
c.mu.Lock()
defer c.mu.Unlock()
opts := store.NewUpdateOptions(fs...)
meta, ok := (r.GetMeta()).(memoryMeta)
if !ok {
return fmt.Errorf("MemoryStore.Update() requires r.GetMeta() to be of type memoryMeta")
}
// Name must be provided via r.GetMeta()
mesh := r.GetMeta().GetMesh()
_, record := c.findRecord(string(r.Descriptor().Name), r.GetMeta().GetName(), mesh)
if record == nil || meta.Version != record.Version {
return store.ErrorResourceConflict(r.Descriptor().Name, r.GetMeta().GetName(), r.GetMeta().GetMesh())
}
meta.Version = meta.Version.Next()
meta.ModificationTime = opts.ModificationTime
meta.Labels = opts.Labels
r.SetMeta(meta)
record.Version = meta.Version
record.ModificationTime = meta.ModificationTime
record.Labels = meta.Labels
content, err := core_model.ToJSON(r.GetSpec())
if err != nil {
return err
}
record.Spec = string(content)
if c.eventWriter != nil {
go func() {
c.eventWriter.Send(events.ResourceChangedEvent{
Operation: events.Update,
Type: r.Descriptor().Name,
Key: core_model.MetaToResourceKey(r.GetMeta()),
})
}()
}
return nil
}
func (c *memoryStore) Delete(ctx context.Context, r core_model.Resource, fs ...store.DeleteOptionsFunc) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.delete(r, fs...)
}
func (c *memoryStore) delete(r core_model.Resource, fs ...store.DeleteOptionsFunc) error {
opts := store.NewDeleteOptions(fs...)
_, ok := (r.GetMeta()).(memoryMeta)
if r.GetMeta() != nil && !ok {
return fmt.Errorf("MemoryStore.Delete() requires r.GetMeta() either to be nil or to be of type memoryMeta")
}
// Name must be provided via DeleteOptions
idx, record := c.findRecord(string(r.Descriptor().Name), opts.Name, opts.Mesh)
if record == nil {
return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
}
for _, child := range record.Children {
_, childRecord := c.findRecord(child.ResourceType, child.Name, child.Mesh)
if childRecord == nil {
continue // resource was already deleted
}
obj, err := registry.Global().NewObject(core_model.ResourceType(child.ResourceType))
if err != nil {
return fmt.Errorf("MemoryStore.Delete() couldn't unmarshal child resource")
}
if err := c.unmarshalRecord(childRecord, obj); err != nil {
return fmt.Errorf("MemoryStore.Delete() couldn't unmarshal child resource")
}
if err := c.delete(obj, store.DeleteByKey(childRecord.Name, childRecord.Mesh)); err != nil {
return fmt.Errorf("MemoryStore.Delete() couldn't delete linked child resource")
}
}
c.records = append(c.records[:idx], c.records[idx+1:]...)
if c.eventWriter != nil {
go func() {
c.eventWriter.Send(events.ResourceChangedEvent{
Operation: events.Delete,
Type: r.Descriptor().Name,
Key: core_model.ResourceKey{
Mesh: opts.Mesh,
Name: opts.Name,
},
})
}()
}
return nil
}
func (c *memoryStore) Get(_ context.Context, r core_model.Resource, fs ...store.GetOptionsFunc) error {
c.mu.RLock()
defer c.mu.RUnlock()
opts := store.NewGetOptions(fs...)
// Name must be provided via GetOptions
_, record := c.findRecord(string(r.Descriptor().Name), opts.Name, opts.Mesh)
if record == nil {
return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
}
if opts.Version != "" && opts.Version != record.Version.String() {
return store.ErrorResourceConflict(r.Descriptor().Name, opts.Name, opts.Mesh)
}
return c.unmarshalRecord(record, r)
}
func (c *memoryStore) List(_ context.Context, rs core_model.ResourceList, fs ...store.ListOptionsFunc) error {
c.mu.RLock()
defer c.mu.RUnlock()
opts := store.NewListOptions(fs...)
records := c.findRecords(string(rs.GetItemType()), opts.Mesh, opts.NameContains)
for i := 0; i < len(records); i++ {
r := rs.NewItem()
if err := c.unmarshalRecord(records[i], r); err != nil {
return err
}
_ = rs.AddItem(r)
}
rs.GetPagination().SetTotal(uint32(len(records)))
return nil
}
func (c *memoryStore) findRecord(
resourceType string, name string, mesh string,
) (int, *memoryStoreRecord) {
for idx, rec := range c.records {
if rec.ResourceType == resourceType &&
rec.Name == name &&
rec.Mesh == mesh {
return idx, rec
}
}
return -1, nil
}
func (c *memoryStore) findRecords(resourceType string, mesh string, contains string) []*memoryStoreRecord {
res := make([]*memoryStoreRecord, 0)
for _, rec := range c.records {
if rec.ResourceType != resourceType {
continue
}
if mesh != "" && rec.Mesh != mesh {
continue
}
if contains != "" && !strings.Contains(rec.Name, contains) {
continue
}
res = append(res, rec)
}
return res
}
func (c *memoryStore) marshalRecord(resourceType string, meta memoryMeta, spec core_model.ResourceSpec) (*memoryStoreRecord, error) {
// convert spec into storage representation
content, err := core_model.ToJSON(spec)
if err != nil {
return nil, err
}
return &memoryStoreRecord{
resourceKey: resourceKey{
ResourceType: resourceType,
// Name must be provided via CreateOptions
Name: meta.Name,
Mesh: meta.Mesh,
},
Version: meta.Version,
Spec: string(content),
CreationTime: meta.CreationTime,
ModificationTime: meta.ModificationTime,
Labels: meta.Labels,
}, nil
}
func (c *memoryStore) unmarshalRecord(s *memoryStoreRecord, r core_model.Resource) error {
r.SetMeta(memoryMeta{
Name: s.Name,
Mesh: s.Mesh,
Version: s.Version,
CreationTime: s.CreationTime,
ModificationTime: s.ModificationTime,
Labels: s.Labels,
})
return core_model.FromJSON([]byte(s.Spec), r.GetSpec())
}