blob: da1d56dd50ac7d13b777471a208fde5f1d75a7ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package manager
import (
"context"
"errors"
"fmt"
"reflect"
"time"
)
import (
"github.com/sethvargo/go-retry"
)
import (
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
)
type ReadOnlyResourceManager interface {
Get(context.Context, model.Resource, ...store.GetOptionsFunc) error
List(context.Context, model.ResourceList, ...store.ListOptionsFunc) error
}
type ResourceManager interface {
ReadOnlyResourceManager
Create(context.Context, model.Resource, ...store.CreateOptionsFunc) error
Update(context.Context, model.Resource, ...store.UpdateOptionsFunc) error
Delete(context.Context, model.Resource, ...store.DeleteOptionsFunc) error
DeleteAll(context.Context, model.ResourceList, ...store.DeleteAllOptionsFunc) error
}
func NewResourceManager(store store.ResourceStore) ResourceManager {
return &resourcesManager{
Store: store,
}
}
var _ ResourceManager = &resourcesManager{}
type resourcesManager struct {
Store store.ResourceStore
}
func (r *resourcesManager) Get(ctx context.Context, resource model.Resource, fs ...store.GetOptionsFunc) error {
return r.Store.Get(ctx, resource, fs...)
}
func (r *resourcesManager) List(ctx context.Context, list model.ResourceList, fs ...store.ListOptionsFunc) error {
return r.Store.List(ctx, list, fs...)
}
func (r *resourcesManager) Create(ctx context.Context, resource model.Resource, fs ...store.CreateOptionsFunc) error {
if err := model.Validate(resource); err != nil {
return err
}
opts := store.NewCreateOptions(fs...)
var owner model.Resource
if resource.Descriptor().Scope == model.ScopeMesh {
owner = core_mesh.NewMeshResource()
if err := r.Store.Get(ctx, owner, store.GetByKey(opts.Mesh, model.NoMesh)); err != nil {
return MeshNotFound(opts.Mesh)
}
}
if resource.Descriptor().Name == core_mesh.MeshInsightType {
owner = core_mesh.NewMeshResource()
if err := r.Store.Get(ctx, owner, store.GetByKey(opts.Name, model.NoMesh)); err != nil {
return MeshNotFound(opts.Name)
}
}
return r.Store.Create(ctx, resource, append(fs, store.CreatedAt(time.Now()), store.CreateWithOwner(owner))...)
}
func (r *resourcesManager) Delete(ctx context.Context, resource model.Resource, fs ...store.DeleteOptionsFunc) error {
return r.Store.Delete(ctx, resource, fs...)
}
func (r *resourcesManager) DeleteAll(ctx context.Context, list model.ResourceList, fs ...store.DeleteAllOptionsFunc) error {
return DeleteAllResources(r, ctx, list, fs...)
}
func DeleteAllResources(manager ResourceManager, ctx context.Context, list model.ResourceList, fs ...store.DeleteAllOptionsFunc) error {
opts := store.NewDeleteAllOptions(fs...)
if err := manager.List(ctx, list, store.ListByMesh(opts.Mesh)); err != nil {
return err
}
for _, item := range list.GetItems() {
if err := manager.Delete(ctx, item, store.DeleteBy(model.MetaToResourceKey(item.GetMeta()))); err != nil && !store.IsResourceNotFound(err) {
return err
}
}
return nil
}
func (r *resourcesManager) Update(ctx context.Context, resource model.Resource, fs ...store.UpdateOptionsFunc) error {
if err := model.Validate(resource); err != nil {
return err
}
return r.Store.Update(ctx, resource, append(fs, store.ModifiedAt(time.Now()))...)
}
type ConflictRetry struct {
BaseBackoff time.Duration
MaxTimes uint
JitterPercent uint
}
type UpsertOpts struct {
ConflictRetry ConflictRetry
Transactions store.Transactions
}
type UpsertFunc func(opts *UpsertOpts)
func WithConflictRetry(baseBackoff time.Duration, maxTimes uint, jitterPercent uint) UpsertFunc {
return func(opts *UpsertOpts) {
opts.ConflictRetry.BaseBackoff = baseBackoff
opts.ConflictRetry.MaxTimes = maxTimes
opts.ConflictRetry.JitterPercent = jitterPercent
}
}
func WithTransactions(transactions store.Transactions) UpsertFunc {
return func(opts *UpsertOpts) {
opts.Transactions = transactions
}
}
func NewUpsertOpts(fs ...UpsertFunc) UpsertOpts {
opts := UpsertOpts{
Transactions: store.NoTransactions{},
}
for _, f := range fs {
f(&opts)
}
return opts
}
var ErrSkipUpsert = errors.New("don't do upsert")
func Upsert(ctx context.Context, manager ResourceManager, key model.ResourceKey, resource model.Resource, fn func(resource model.Resource) error, fs ...UpsertFunc) error {
opts := NewUpsertOpts(fs...)
upsert := func(ctx context.Context) error {
return store.InTx(ctx, opts.Transactions, func(ctx context.Context) error {
create := false
err := manager.Get(ctx, resource, store.GetBy(key), store.GetConsistent())
if err != nil {
if store.IsResourceNotFound(err) {
create = true
} else {
return err
}
}
if err := fn(resource); err != nil {
if err == ErrSkipUpsert { // Way to skip inserts when there are no change
return nil
}
return err
}
if create {
return manager.Create(ctx, resource, store.CreateBy(key))
} else {
return manager.Update(ctx, resource)
}
})
}
if opts.ConflictRetry.BaseBackoff <= 0 || opts.ConflictRetry.MaxTimes == 0 {
return upsert(ctx)
}
backoff := retry.NewExponential(opts.ConflictRetry.BaseBackoff)
backoff = retry.WithMaxRetries(uint64(opts.ConflictRetry.MaxTimes), backoff)
backoff = retry.WithJitterPercent(uint64(opts.ConflictRetry.JitterPercent), backoff)
return retry.Do(ctx, backoff, func(ctx context.Context) error {
resource.SetMeta(nil)
specType := reflect.TypeOf(resource.GetSpec()).Elem()
zeroSpec := reflect.New(specType).Interface().(model.ResourceSpec)
if err := resource.SetSpec(zeroSpec); err != nil {
return err
}
err := upsert(ctx)
if errors.Is(err, &store.ResourceConflictError{}) {
return retry.RetryableError(err)
}
return err
})
}
type MeshNotFoundError struct {
Mesh string
}
func (m *MeshNotFoundError) Error() string {
return fmt.Sprintf("mesh of name %s is not found", m.Mesh)
}
func MeshNotFound(meshName string) error {
return &MeshNotFoundError{meshName}
}
func IsMeshNotFound(err error) bool {
_, ok := err.(*MeshNotFoundError)
return ok
}