blob: ace1e06c7864ab1624db812905164fb08e2c0c5d [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package memory provides an in-memory volatile config store implementation
package memory
import (
"errors"
"fmt"
"sync"
"time"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collection"
)
var (
errNotFound = errors.New("item not found")
errAlreadyExists = errors.New("item already exists")
// TODO: can we make this compatible with kerror.IsConflict without imports the library?
errConflict = errors.New("conflicting resource version, try again")
)
const ResourceVersion string = "ResourceVersion"
// Make creates an in-memory config store from a config schemas
// It is with validation
func Make(schemas collection.Schemas) model.ConfigStore {
return newStore(schemas, false)
}
// MakeSkipValidation creates an in-memory config store from a config schemas
// It is without validation
func MakeSkipValidation(schemas collection.Schemas) model.ConfigStore {
return newStore(schemas, true)
}
func newStore(schemas collection.Schemas, skipValidation bool) model.ConfigStore {
out := store{
schemas: schemas,
data: make(map[config.GroupVersionKind]map[string]*sync.Map),
skipValidation: skipValidation,
}
for _, s := range schemas.All() {
out.data[s.Resource().GroupVersionKind()] = make(map[string]*sync.Map)
}
return &out
}
type store struct {
schemas collection.Schemas
data map[config.GroupVersionKind]map[string]*sync.Map
skipValidation bool
mutex sync.RWMutex
}
func (cr *store) Schemas() collection.Schemas {
return cr.schemas
}
func (cr *store) Get(kind config.GroupVersionKind, name, namespace string) *config.Config {
cr.mutex.RLock()
defer cr.mutex.RUnlock()
_, ok := cr.data[kind]
if !ok {
return nil
}
ns, exists := cr.data[kind][namespace]
if !exists {
return nil
}
out, exists := ns.Load(name)
if !exists {
return nil
}
config := out.(config.Config)
return &config
}
func (cr *store) List(kind config.GroupVersionKind, namespace string) ([]config.Config, error) {
cr.mutex.RLock()
defer cr.mutex.RUnlock()
data, exists := cr.data[kind]
if !exists {
return nil, nil
}
out := make([]config.Config, 0, len(cr.data[kind]))
if namespace == "" {
for _, ns := range data {
ns.Range(func(key, value interface{}) bool {
out = append(out, value.(config.Config))
return true
})
}
} else {
ns, exists := data[namespace]
if !exists {
return nil, nil
}
ns.Range(func(key, value interface{}) bool {
out = append(out, value.(config.Config))
return true
})
}
return out, nil
}
func (cr *store) Delete(kind config.GroupVersionKind, name, namespace string, resourceVersion *string) error {
cr.mutex.Lock()
defer cr.mutex.Unlock()
data, ok := cr.data[kind]
if !ok {
return fmt.Errorf("unknown type %v", kind)
}
ns, exists := data[namespace]
if !exists {
return errNotFound
}
_, exists = ns.Load(name)
if !exists {
return errNotFound
}
ns.Delete(name)
return nil
}
func (cr *store) Create(cfg config.Config) (string, error) {
cr.mutex.Lock()
defer cr.mutex.Unlock()
kind := cfg.GroupVersionKind
s, ok := cr.schemas.FindByGroupVersionKind(kind)
if !ok {
return "", fmt.Errorf("unknown type %v", kind)
}
if !cr.skipValidation {
if _, err := s.Resource().ValidateConfig(cfg); err != nil {
return "", err
}
}
ns, exists := cr.data[kind][cfg.Namespace]
if !exists {
ns = new(sync.Map)
cr.data[kind][cfg.Namespace] = ns
}
_, exists = ns.Load(cfg.Name)
if !exists {
tnow := time.Now()
if cfg.ResourceVersion == "" {
cfg.ResourceVersion = tnow.String()
}
// Set the creation timestamp, if not provided.
if cfg.CreationTimestamp.IsZero() {
cfg.CreationTimestamp = tnow
}
ns.Store(cfg.Name, cfg)
return cfg.ResourceVersion, nil
}
return "", errAlreadyExists
}
func (cr *store) Update(cfg config.Config) (string, error) {
cr.mutex.Lock()
defer cr.mutex.Unlock()
kind := cfg.GroupVersionKind
s, ok := cr.schemas.FindByGroupVersionKind(kind)
if !ok {
return "", fmt.Errorf("unknown type %v", kind)
}
if !cr.skipValidation {
if _, err := s.Resource().ValidateConfig(cfg); err != nil {
return "", err
}
}
ns, exists := cr.data[kind][cfg.Namespace]
if !exists {
return "", errNotFound
}
existing, exists := ns.Load(cfg.Name)
if !exists {
return "", errNotFound
}
if hasConflict(existing.(config.Config), cfg) {
return "", errConflict
}
if cfg.Annotations != nil && cfg.Annotations[ResourceVersion] != "" {
cfg.ResourceVersion = cfg.Annotations[ResourceVersion]
delete(cfg.Annotations, ResourceVersion)
} else {
cfg.ResourceVersion = time.Now().String()
}
ns.Store(cfg.Name, cfg)
return cfg.ResourceVersion, nil
}
func (cr *store) UpdateStatus(cfg config.Config) (string, error) {
return cr.Update(cfg)
}
func (cr *store) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) {
cr.mutex.Lock()
defer cr.mutex.Unlock()
gvk := orig.GroupVersionKind
s, ok := cr.schemas.FindByGroupVersionKind(gvk)
if !ok {
return "", fmt.Errorf("unknown type %v", gvk)
}
cfg, _ := patchFn(orig)
if !cr.skipValidation {
if _, err := s.Resource().ValidateConfig(cfg); err != nil {
return "", err
}
}
_, ok = cr.data[gvk]
if !ok {
return "", errNotFound
}
ns, exists := cr.data[gvk][orig.Namespace]
if !exists {
return "", errNotFound
}
rev := time.Now().String()
cfg.ResourceVersion = rev
ns.Store(cfg.Name, cfg)
return rev, nil
}
// hasConflict checks if the two resources have a conflict, which will block Update calls
func hasConflict(existing, replacement config.Config) bool {
if replacement.ResourceVersion == "" {
// We don't care about resource version, so just always overwrite
return false
}
// We set a resource version but its not matched, it is a conflict
if replacement.ResourceVersion != existing.ResourceVersion {
return true
}
return false
}