blob: cb45f22ed0f461bd612ed73fb92b3f56cd27d92a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package resource
import (
"context"
"errors"
"fmt"
kiemodel "github.com/apache/servicecomb-kie/pkg/model"
kiedb "github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
)
const Config = "config"
func NewConfig(e *v1sync.Event) Resource {
kind := config.GetString("registry.kind", "etcd", config.WithStandby("registry_plugin"))
err := kiedb.Init(kind)
if err != nil {
log.Fatal(fmt.Sprintf("kie datasource[%s] init failed", kind), err)
}
c := &kvConfig{
event: e,
}
c.resource = c
return c
}
type kvConfig struct {
defaultFailHandler
event *v1sync.Event
input *kiemodel.KVDoc
cur *kiemodel.KVDoc
resource docResource
resourceID string
}
func (c *kvConfig) WithDomainProjectContext(ctx context.Context) context.Context {
ctx = util.SetDomainProject(ctx,
c.event.Opts[string(util.CtxDomain)],
c.event.Opts[string(util.CtxProject)])
return ctx
}
func (c *kvConfig) loadInput() error {
c.input = new(kiemodel.KVDoc)
callback := func() {
c.resourceID = c.input.ID
}
param := newInputParam(c.input, callback)
return newInputLoader(
c.event,
param,
param,
param,
).loadInput()
}
func (c *kvConfig) LoadCurrentResource(ctx context.Context) *Result {
err := c.loadInput()
if err != nil {
return FailResult(err)
}
ctx = c.WithDomainProjectContext(ctx)
cur, err := c.resource.Get(ctx, c.resourceID)
if err != nil {
if errors.Is(err, kiedb.ErrKeyNotExists) {
return nil
}
return FailResult(err)
}
c.cur = cur
return nil
}
func (c *kvConfig) NeedOperate(ctx context.Context) *Result {
ck := &checker{
curNotNil: c.cur != nil,
event: c.event,
updateTime: func() (int64, error) {
return c.cur.UpdateTime, nil
},
resourceID: kiedb.TombstoneID(c.input),
}
ck.tombstoneLoader = ck
return ck.needOperate(ctx)
}
func (c *kvConfig) Operate(ctx context.Context) *Result {
ctx = c.WithDomainProjectContext(ctx)
return newOperator(c).operate(ctx, c.event.Action)
}
type docResource interface {
Create(ctx context.Context, doc *kiemodel.KVDoc) error
Get(ctx context.Context, ID string) (*kiemodel.KVDoc, error)
Update(ctx context.Context, doc *kiemodel.KVDoc) error
Delete(ctx context.Context, ID string) error
}
func (c *kvConfig) Create(ctx context.Context, doc *kiemodel.KVDoc) error {
revision, err := kiedb.GetBroker().GetRevisionDao().ApplyRevision(ctx, doc.Domain)
if err != nil {
return fmt.Errorf("apply kv revision failed, %s", err.Error())
}
completeKV(doc, revision)
doc, err = kiedb.GetBroker().GetKVDao().Create(ctx, doc)
if err != nil {
return fmt.Errorf("create kv failed, %s", err.Error())
}
err = kiedb.GetBroker().GetHistoryDao().AddHistory(ctx, doc)
if err != nil {
log.Warn(fmt.Sprintf("can not updateKeyValue version for [%s] [%s] in [%s], err: %s",
doc.Key, doc.Labels, doc.Domain, err))
}
return nil
}
func completeKV(kv *kiemodel.KVDoc, revision int64) {
kv.UpdateRevision = revision
kv.CreateRevision = revision
}
func (c *kvConfig) Get(ctx context.Context, ID string) (*kiemodel.KVDoc, error) {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
return kiedb.GetBroker().GetKVDao().Get(ctx, &kiemodel.GetKVRequest{
Project: project,
Domain: domain,
ID: ID,
})
}
func (c *kvConfig) Update(ctx context.Context, doc *kiemodel.KVDoc) error {
var err error
doc.UpdateRevision, err = kiedb.GetBroker().GetRevisionDao().ApplyRevision(ctx, doc.Domain)
if err != nil {
return fmt.Errorf("apply kv revision failed, %s", err.Error())
}
err = kiedb.GetBroker().GetKVDao().Update(ctx, doc)
if err != nil {
return fmt.Errorf("update kv failed, %s", err.Error())
}
err = kiedb.GetBroker().GetHistoryDao().AddHistory(ctx, doc)
if err != nil {
log.Warn(fmt.Sprintf("can not add revision for [%s] [%s] in [%s], err: %s",
doc.Key, doc.Labels, doc.Domain, err.Error()))
}
return nil
}
func (c *kvConfig) Delete(ctx context.Context, ID string) error {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
_, err := kiedb.GetBroker().GetKVDao().FindOneAndDelete(ctx, ID, project, domain)
if err != nil {
return fmt.Errorf("delete kv failed, %s", err.Error())
}
_, err = kiedb.GetBroker().GetRevisionDao().ApplyRevision(ctx, domain)
if err != nil {
return fmt.Errorf("the kv [%s] is deleted, but increase revision failed, %s", ID, err.Error())
}
err = kiedb.GetBroker().GetHistoryDao().DelayDeletionTime(ctx, []string{ID}, project, domain)
if err != nil {
log.Warn(fmt.Sprintf("add delete time to [%s] failed, err: %s", ID, err.Error()))
}
return nil
}
func (c *kvConfig) CreateHandle(ctx context.Context) error {
if c.cur != nil {
log.Warn(fmt.Sprintf("create config but doc id exist, %s", c.resourceID))
return c.UpdateHandle(ctx)
}
return c.resource.Create(ctx, c.input)
}
func (c *kvConfig) UpdateHandle(ctx context.Context) error {
if c.cur == nil {
log.Warn(fmt.Sprintf("update action but account not exist, %s", c.resourceID))
return c.CreateHandle(ctx)
}
return c.resource.Update(ctx, c.input)
}
func (c *kvConfig) DeleteHandle(ctx context.Context) error {
return c.resource.Delete(ctx, c.input.ID)
}