blob: 6c1745d84e79f2734a0889c5fb8d73c9dc2a5d16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kv
import (
"context"
"encoding/json"
"regexp"
"strings"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/util"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/auth"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/cari/sync"
"github.com/go-chassis/openlog"
"github.com/little-cui/etcdadpt"
)
// Dao operate data in mongodb
type Dao struct {
}
func (s *Dao) Create(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) (*model.KVDoc, error) {
if err := auth.CheckCreateKV(ctx, kv); err != nil {
return nil, err
}
opts := datasource.NewWriteOptions(options...)
var exist bool
var err error
if opts.SyncEnable {
// if syncEnable is true, will create task in a transaction operation
exist, err = txnCreate(ctx, kv)
} else {
exist, err = create(ctx, kv)
}
if err != nil {
openlog.Error("create error", openlog.WithTags(openlog.Tags{
"err": err.Error(),
"kv": kv,
}))
return nil, err
}
if !exist {
openlog.Error("create error", openlog.WithTags(openlog.Tags{
"err": datasource.ErrKVAlreadyExists.Error(),
"kv": kv,
}))
return nil, datasource.ErrKVAlreadyExists
}
return kv, nil
}
func create(ctx context.Context, kv *model.KVDoc) (bool, error) {
kvBytes, err := json.Marshal(kv)
if err != nil {
openlog.Error("fail to marshal kv " + err.Error())
return false, err
}
return etcdadpt.InsertBytes(ctx, key.KV(kv.Domain, kv.Project, kv.ID), kvBytes)
}
func txnCreate(ctx context.Context, kv *model.KVDoc) (bool, error) {
kvBytes, err := json.Marshal(kv)
if err != nil {
openlog.Error("fail to marshal kv " + err.Error())
return false, err
}
task, err := sync.NewTask(kv.Domain, kv.Project, sync.CreateAction, datasource.ConfigResource, kv)
if err != nil {
openlog.Error("fail to create task" + err.Error())
return false, err
}
taskBytes, err := json.Marshal(task)
if err != nil {
openlog.Error("fail to marshal task ")
return false, err
}
kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.KV(kv.Domain, kv.Project, kv.ID)), etcdadpt.WithValue(kvBytes))
taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
resp, err := etcdadpt.TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut},
etcdadpt.If(etcdadpt.NotExistKey(string(kvOpPut.Key)), etcdadpt.NotExistKey(string(taskOpPut.Key))), nil)
if err != nil {
return false, err
}
return resp.Succeeded, nil
}
// Update update key value
func (s *Dao) Update(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) error {
keyKV := key.KV(kv.Domain, kv.Project, kv.ID)
resp, err := etcdadpt.Get(ctx, keyKV)
if err != nil {
openlog.Error(err.Error())
return err
}
if resp == nil {
return datasource.ErrKeyNotExists
}
var oldKV model.KVDoc
err = json.Unmarshal(resp.Value, &oldKV)
if err != nil {
openlog.Error(err.Error())
return err
}
if err := auth.CheckUpdateKV(ctx, &oldKV); err != nil {
return err
}
oldKV.LabelFormat = kv.LabelFormat
oldKV.Value = kv.Value
oldKV.Status = kv.Status
oldKV.Checker = kv.Checker
oldKV.UpdateTime = kv.UpdateTime
oldKV.UpdateRevision = kv.UpdateRevision
opts := datasource.NewWriteOptions(options...)
if opts.SyncEnable {
// if syncEnable is true, will create task in a transaction operation
err = txnUpdate(ctx, kv)
} else {
err = update(ctx, &oldKV, options...)
}
if err != nil {
openlog.Error(err.Error())
return err
}
return nil
}
func txnUpdate(ctx context.Context, kv *model.KVDoc) error {
keyKV := key.KV(kv.Domain, kv.Project, kv.ID)
kvBytes, err := json.Marshal(kv)
if err != nil {
openlog.Error(err.Error())
return err
}
task, err := sync.NewTask(kv.Domain, kv.Project, sync.UpdateAction, datasource.ConfigResource, kv)
if err != nil {
openlog.Error("fail to create task" + err.Error())
return err
}
taskBytes, err := json.Marshal(task)
if err != nil {
openlog.Error(err.Error())
return err
}
kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(keyKV), etcdadpt.WithValue(kvBytes))
taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
return etcdadpt.Txn(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut})
}
func update(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) error {
keyKV := key.KV(kv.Domain, kv.Project, kv.ID)
kvBytes, err := json.Marshal(kv)
if err != nil {
openlog.Error(err.Error())
return err
}
return etcdadpt.PutBytes(ctx, keyKV, kvBytes)
}
// Extract key values
func getValue(str string) string {
rex := regexp.MustCompile(`\(([^)]+)\)`)
res := rex.FindStringSubmatch(str)
return res[len(res)-1]
}
// Exist supports you query a key value by label map or labels id
func (s *Dao) Exist(ctx context.Context, key, project, domain string, options ...datasource.FindOption) (bool, error) {
opts := datasource.FindOptions{Key: key}
for _, o := range options {
o(&opts)
}
kvs, err := s.listNoAuth(ctx, project, domain,
datasource.WithExactLabels(),
datasource.WithLabels(opts.Labels),
datasource.WithLabelFormat(opts.LabelFormat),
datasource.WithKey(key),
datasource.WithCaseSensitive())
if err != nil {
openlog.Error("check kv exist: " + err.Error())
return false, err
}
if IsUniqueFind(opts) && len(kvs.Data) == 0 {
return false, nil
}
if len(kvs.Data) != 1 {
return false, datasource.ErrTooMany
}
return true, nil
}
func (s *Dao) GetByKey(ctx context.Context, key, project, domain string, options ...datasource.FindOption) ([]*model.KVDoc, error) {
opts := datasource.FindOptions{Key: key}
for _, o := range options {
o(&opts)
}
kvs, err := s.listNoAuth(ctx, project, domain,
datasource.WithExactLabels(),
datasource.WithLabels(opts.Labels),
datasource.WithLabelFormat(opts.LabelFormat),
datasource.WithKey(key),
datasource.WithCaseSensitive())
if err != nil {
openlog.Error("check kv exist: " + err.Error())
return nil, err
}
if IsUniqueFind(opts) && len(kvs.Data) == 0 {
return nil, datasource.ErrKeyNotExists
}
if len(kvs.Data) != 1 {
return nil, datasource.ErrTooMany
}
return kvs.Data, nil
}
// FindOneAndDelete deletes one kv by id and return the deleted kv as these appeared before deletion
// domain=tenant
func (s *Dao) FindOneAndDelete(ctx context.Context, kvID, project, domain string, options ...datasource.WriteOption) (*model.KVDoc, error) {
opts := datasource.NewWriteOptions(options...)
if opts.SyncEnable {
// if syncEnable is ture, will delete kv, create task and create tombstone in a transaction operation
return txnFindOneAndDelete(ctx, kvID, project, domain)
}
return findOneAndDelete(ctx, kvID, project, domain)
}
func findOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
kvKey := key.KV(domain, project, kvID)
kvDoc := model.KVDoc{}
if _, err := getKVDoc(ctx, domain, project, kvID); err != nil {
return nil, err
}
resp, err := etcdadpt.ListAndDelete(ctx, kvKey)
if err != nil {
openlog.Error("delete Key error: " + err.Error())
return nil, err
}
if resp.Count == 0 {
return nil, datasource.ErrKeyNotExists
}
err = json.Unmarshal(resp.Kvs[0].Value, &kvDoc)
if err != nil {
openlog.Error("decode error: " + err.Error())
return nil, err
}
return &kvDoc, nil
}
// txnFindOneAndDelete is to start transaction when delete KV, will create task and tombstone in a transaction operation
func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
kvKey := key.KV(domain, project, kvID)
kvDoc, err := getKVDoc(ctx, domain, project, kvID)
if err != nil {
openlog.Error(err.Error())
return nil, err
}
task, err := sync.NewTask(domain, project, sync.DeleteAction, datasource.ConfigResource, kvDoc)
if err != nil {
openlog.Error("fail to create task" + err.Error())
return nil, err
}
taskBytes, err := json.Marshal(task)
if err != nil {
openlog.Error("fail to marshal task" + err.Error())
return nil, err
}
tombstone := sync.NewTombstone(domain, project, datasource.ConfigResource, datasource.TombstoneID(kvDoc))
tombstoneBytes, err := json.Marshal(tombstone)
if err != nil {
openlog.Error("fail to marshal tombstone" + err.Error())
return nil, err
}
kvOpDel := etcdadpt.OpDel(etcdadpt.WithStrKey(kvKey))
taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, project,
task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
tombstoneOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project, tombstone.ResourceType, tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes))
err = etcdadpt.Txn(ctx, []etcdadpt.OpOptions{kvOpDel, taskOpPut, tombstoneOpPut})
if err != nil {
openlog.Error("find and delete error", openlog.WithTags(openlog.Tags{
"err": err.Error(),
}))
return nil, err
}
return kvDoc, nil
}
// getKVDoc is to get kv for delete
func getKVDoc(ctx context.Context, domain, project, kvID string) (*model.KVDoc, error) {
resp, err := etcdadpt.Get(ctx, key.KV(domain, project, kvID))
if err != nil {
openlog.Error(err.Error())
return nil, err
}
if resp == nil {
return nil, datasource.ErrKeyNotExists
}
curKV := &model.KVDoc{}
err = json.Unmarshal(resp.Value, curKV)
if err != nil {
openlog.Error("decode error: " + err.Error())
return nil, err
}
if err := auth.CheckDeleteKV(ctx, curKV); err != nil {
return nil, err
}
return curKV, nil
}
// FindManyAndDelete deletes multiple kvs and return the deleted kv list as these appeared before deletion
func (s *Dao) FindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string, options ...datasource.WriteOption) ([]*model.KVDoc, int64, error) {
opts := datasource.NewWriteOptions(options...)
if opts.SyncEnable {
// if sync enable is true, will delete kvs, create tasks and tombstones
return txnFindManyAndDelete(ctx, kvIDs, project, domain)
}
return findManyAndDelete(ctx, kvIDs, project, domain)
}
func findManyAndDelete(ctx context.Context, kvIDs []string, project, domain string) ([]*model.KVDoc, int64, error) {
var docs []*model.KVDoc
var opOptions []etcdadpt.OpOptions
var err error
for _, id := range kvIDs {
if _, err = getKVDoc(ctx, domain, project, id); err != nil {
continue
}
opOptions = append(opOptions, etcdadpt.OpDel(etcdadpt.WithStrKey(key.KV(domain, project, id))))
}
if len(opOptions) == 0 {
return make([]*model.KVDoc, 0), 0, err
}
resp, err := etcdadpt.ListAndDeleteMany(ctx, opOptions...)
if err != nil {
openlog.Error("find Keys error: " + err.Error())
return nil, 0, err
}
if resp.Count == 0 {
return nil, 0, datasource.ErrKeyNotExists
}
for _, kv := range resp.Kvs {
var doc model.KVDoc
err := json.Unmarshal(kv.Value, &doc)
if err != nil {
openlog.Error("fail to unmarshal kv" + err.Error())
return nil, 0, err
}
docs = append(docs, &doc)
}
return docs, resp.Count, nil
}
// txnFindManyAndDelete is to start transaction when delete KVs, will create tasks and tombstones in a transaction operation
func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string) ([]*model.KVDoc, int64, error) {
var docs []*model.KVDoc
var opOptions []etcdadpt.OpOptions
kvTotalNum := len(kvIDs)
docs = make([]*model.KVDoc, kvTotalNum)
tasks := make([]*sync.Task, kvTotalNum)
tombstones := make([]*sync.Tombstone, kvTotalNum)
successKVNum := 0
for i := 0; i < kvTotalNum; i++ {
kvDoc, err := getKVDoc(ctx, domain, project, kvIDs[i])
// if not find the kv, continue
if err != nil {
if err == datasource.ErrKeyNotExists {
openlog.Error(err.Error())
continue
}
return nil, 0, err
}
if kvDoc == nil {
continue
}
task, err := sync.NewTask(domain, project, sync.DeleteAction, datasource.ConfigResource, kvDoc)
if err != nil {
openlog.Error("fail to create task")
return nil, 0, err
}
docs[successKVNum] = kvDoc
tasks[successKVNum] = task
tombstones[successKVNum] = sync.NewTombstone(domain, project, datasource.ConfigResource,
datasource.TombstoneID(kvDoc))
successKVNum++
}
if successKVNum == 0 {
return nil, 0, datasource.ErrKeyNotExists
}
if successKVNum != kvTotalNum {
docs = docs[:successKVNum]
tasks = tasks[:successKVNum]
tombstones = tombstones[:successKVNum]
}
for _, id := range kvIDs {
opOptions = append(opOptions, etcdadpt.OpDel(etcdadpt.WithStrKey(key.KV(domain, project, id))))
}
for _, task := range tasks {
taskBytes, err := json.Marshal(task)
if err != nil {
openlog.Error("fail to marshal task" + err.Error())
return nil, 0, err
}
opOptions = append(opOptions, etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, project,
task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes)))
}
for _, tombstone := range tombstones {
tombstoneBytes, err := json.Marshal(tombstone)
if err != nil {
openlog.Error("fail to marshal tombstone" + err.Error())
return nil, 0, err
}
opOptions = append(opOptions, etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project,
tombstone.ResourceType, tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes)))
}
err := etcdadpt.Txn(ctx, opOptions)
if err != nil {
openlog.Error("find many and delete error", openlog.WithTags(openlog.Tags{
"err": err.Error(),
}))
return nil, 0, err
}
return docs, int64(successKVNum), nil
}
// Get get kv by kv id
func (s *Dao) Get(ctx context.Context, req *model.GetKVRequest) (*model.KVDoc, error) {
resp, err := etcdadpt.Get(ctx, key.KV(req.Domain, req.Project, req.ID))
if err != nil {
openlog.Error(err.Error())
return nil, err
}
if resp == nil {
return nil, datasource.ErrKeyNotExists
}
curKV := &model.KVDoc{}
err = json.Unmarshal(resp.Value, curKV)
if err != nil {
openlog.Error("decode error: " + err.Error())
return nil, err
}
if err := auth.CheckGetKV(ctx, curKV); err != nil {
return nil, err
}
return curKV, nil
}
func (s *Dao) Total(ctx context.Context, project, domain string) (int64, error) {
_, total, err := etcdadpt.List(ctx, key.KVList(domain, project), etcdadpt.WithCountOnly())
if err != nil {
openlog.Error("find total number: " + err.Error())
return 0, err
}
return total, err
}
// List get kv list by key and criteria
func (s *Dao) List(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) {
result, opts, err := s.listData(ctx, project, domain, options...)
if err != nil {
return nil, err
}
filterKVs, err := auth.FilterKVList(ctx, result.Data)
if err != nil {
return nil, err
}
result.Data = filterKVs
result.Total = len(filterKVs)
return pagingResult(result, opts), nil
}
func (s *Dao) listNoAuth(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) {
result, opts, err := s.listData(ctx, project, domain, options...)
if err != nil {
return nil, err
}
return pagingResult(result, opts), nil
}
// List get kv list by key and criteria
func (s *Dao) listData(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, datasource.FindOptions, error) {
opts := datasource.NewDefaultFindOpts()
for _, o := range options {
o(&opts)
}
ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
defer cancel()
regex, err := toRegex(opts)
if err != nil {
return nil, opts, err
}
if Enabled() {
result, useCache, err := Search(ctx, &CacheSearchReq{
Domain: domain,
Project: project,
Opts: &opts,
Regex: regex,
})
if useCache && err == nil {
return result, opts, nil
}
if useCache && err != nil {
openlog.Error("using cache to search kv failed: " + err.Error())
}
}
result, err := matchLabelsSearch(ctx, domain, project, regex, opts)
if err != nil {
openlog.Error("list kv failed: " + err.Error())
return nil, opts, err
}
return result, opts, nil
}
func matchLabelsSearch(ctx context.Context, domain, project string, regex *regexp.Regexp, opts datasource.FindOptions) (*model.KVResponse, error) {
openlog.Debug("using labels to search kv")
kvs, _, err := etcdadpt.List(ctx, key.KVList(domain, project))
if err != nil {
return nil, err
}
result := &model.KVResponse{
Data: []*model.KVDoc{},
}
for _, kv := range kvs {
var doc model.KVDoc
err := json.Unmarshal(kv.Value, &doc)
if err != nil {
openlog.Error("decode to KVList error: " + err.Error())
continue
}
if !filterMatch(&doc, opts, regex) {
continue
}
datasource.ClearPart(&doc)
result.Data = append(result.Data, &doc)
result.Total++
if IsUniqueFind(opts) {
break
}
}
return result, nil
}
func IsUniqueFind(opts datasource.FindOptions) bool {
return opts.LabelFormat != "" && opts.Key != ""
}
func toRegex(opts datasource.FindOptions) (*regexp.Regexp, error) {
var value string
if opts.Key == "" {
return nil, nil
}
switch {
case strings.HasPrefix(opts.Key, "beginWith("):
value = strings.ReplaceAll(getValue(opts.Key), ".", "\\.") + ".*"
case strings.HasPrefix(opts.Key, "wildcard("):
value = strings.ReplaceAll(getValue(opts.Key), ".", "\\.")
value = strings.ReplaceAll(value, "*", ".*")
default:
value = strings.ReplaceAll(opts.Key, ".", "\\.")
}
value = "^" + value + "$"
if !opts.CaseSensitive {
value = "(?i)" + value
}
regex, err := regexp.Compile(value)
if err != nil {
openlog.Error("invalid wildcard expr: " + value + ", error: " + err.Error())
return nil, err
}
return regex, nil
}
func pagingResult(result *model.KVResponse, opts datasource.FindOptions) *model.KVResponse {
datasource.ReverseByUpdateRev(result.Data)
if opts.Limit == 0 {
return result
}
total := int64(result.Total)
if opts.Offset >= total {
result.Data = []*model.KVDoc{}
return result
}
end := opts.Offset + opts.Limit
if end > total {
end = total
}
result.Data = result.Data[opts.Offset:end]
return result
}
func filterMatch(doc *model.KVDoc, opts datasource.FindOptions, regex *regexp.Regexp) bool {
if opts.Status != "" && doc.Status != opts.Status {
return false
}
if regex != nil && !regex.MatchString(doc.Key) {
return false
}
if len(opts.Labels) != 0 {
if opts.ExactLabels && !util.IsEquivalentLabel(opts.Labels, doc.Labels) {
return false
}
if !opts.ExactLabels && !util.IsContainLabel(doc.Labels, opts.Labels) {
return false
}
}
if opts.LabelFormat != "" && doc.LabelFormat != opts.LabelFormat {
return false
}
return true
}