increase revision counter when delete key (#106)
diff --git a/server/pubsub/struct.go b/server/pubsub/struct.go
index 1e8cdae..df67116 100644
--- a/server/pubsub/struct.go
+++ b/server/pubsub/struct.go
@@ -20,9 +20,16 @@
import (
"encoding/json"
"errors"
- "github.com/apache/servicecomb-kie/pkg/common"
"reflect"
"strings"
+
+ "github.com/apache/servicecomb-kie/pkg/common"
+)
+
+// const
+const (
+ ActionPut = "put"
+ ActionDelete = "delete"
)
//KVChangeEvent is event between kie nodes, and broadcast by serf
diff --git a/server/resource/v1/kv_resource.go b/server/resource/v1/kv_resource.go
index 463dfb7..2c36103 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -72,7 +72,7 @@
Labels: kv.Labels,
Project: project,
DomainID: kv.Domain,
- Action: "put",
+ Action: pubsub.ActionPut,
})
if err != nil {
openlogging.Warn("lost kv change event:" + err.Error())
@@ -326,7 +326,17 @@
WriteErrResponse(context, http.StatusBadRequest, common.ErrKvIDMustNotEmpty, common.ContentTypeText)
return
}
- err := service.KVService.Delete(context.Ctx, kvID, domain.(string), project)
+ result, err := service.KVService.FindKV(context.Ctx, domain.(string), project,
+ service.WithID(kvID))
+ if err != nil && err != service.ErrKeyNotExists {
+ WriteErrResponse(context, http.StatusInternalServerError, err.Error(), common.ContentTypeText)
+ return
+ } else if err == service.ErrKeyNotExists {
+ context.WriteHeader(http.StatusNoContent)
+ return
+ }
+ kv := result[0].Data[0]
+ err = service.KVService.Delete(context.Ctx, kvID, domain.(string), project)
if err != nil {
openlogging.Error("delete failed ,", openlogging.WithTags(openlogging.Tags{
"kvID": kvID,
@@ -335,6 +345,16 @@
WriteErrResponse(context, http.StatusInternalServerError, err.Error(), common.ContentTypeText)
return
}
+ err = pubsub.Publish(&pubsub.KVChangeEvent{
+ Key: kv.Key,
+ Labels: kv.Labels,
+ Project: project,
+ DomainID: domain.(string),
+ Action: pubsub.ActionDelete,
+ })
+ if err != nil {
+ openlogging.Warn("lost kv change event:" + err.Error())
+ }
context.WriteHeader(http.StatusNoContent)
}
diff --git a/server/service/mongo/kv/kv_dao.go b/server/service/mongo/kv/kv_dao.go
index f96114a..9b978db 100644
--- a/server/service/mongo/kv/kv_dao.go
+++ b/server/service/mongo/kv/kv_dao.go
@@ -167,6 +167,10 @@
//deleteKV by kvID
func deleteKV(ctx context.Context, kvID, project, domain string) error {
+ if _, err := counter.ApplyRevision(ctx, domain); err != nil {
+ openlogging.Error(fmt.Sprintf("increase revision failed, the kv [%s] not deleted: [%s]", kvID, err))
+ return err
+ }
collection := session.GetDB().Collection(session.CollectionKV)
dr, err := collection.DeleteOne(ctx, bson.M{"id": kvID, "project": project, "domain": domain})
//check error and delete number
@@ -229,3 +233,14 @@
return findKeys(ctx, filter, true)
}
+
+//findKVByID get kvs by kv id
+func findKVDocByID(ctx context.Context, domain, project, kvID string) (*model.KVDoc, error) {
+ filter := bson.M{"id": kvID, "domain": domain, "project": project}
+ kvs, err := findOneKey(ctx, filter)
+ if err != nil {
+ openlogging.Error(err.Error())
+ return nil, err
+ }
+ return kvs[0], nil
+}
diff --git a/server/service/mongo/kv/kv_service.go b/server/service/mongo/kv/kv_service.go
index 7e95ddd..f828caa 100644
--- a/server/service/mongo/kv/kv_service.go
+++ b/server/service/mongo/kv/kv_service.go
@@ -20,7 +20,6 @@
import (
"context"
"errors"
- "fmt"
"reflect"
"time"
@@ -35,6 +34,8 @@
const (
MsgFindKvFailed = "find kv failed, deadline exceeded"
MsgFindOneKey = "find one key"
+ MsgFindOneKeyByID = "find one key by id"
+ MsgFindMoreKey = "find more"
MsgHitExactLabels = "hit exact labels"
FmtErrFindKvFailed = "can not find kv in %s"
)
@@ -211,13 +212,21 @@
return nil, session.ErrMissingProject
}
+ if opts.ID != "" {
+ openlogging.Debug(MsgFindOneKeyByID, openlogging.WithTags(openlogging.Tags{
+ "id": opts.ID,
+ "key": opts.Key,
+ "labels": opts.Labels,
+ }))
+ return findKVByID(ctx, domain, project, opts.ID)
+ }
+
cur, _, err := findKV(ctx, domain, project, opts)
if err != nil {
return nil, err
}
defer cur.Close(ctx)
- kvResp := make([]*model.KVResponse, 0)
if opts.Depth == 0 && opts.Key != "" {
openlogging.Debug(MsgFindOneKey, openlogging.WithTags(
map[string]interface{}{
@@ -228,53 +237,10 @@
))
return cursorToOneKV(ctx, cur, opts.Labels)
}
- openlogging.Debug("find more", openlogging.WithTags(openlogging.Tags{
+ openlogging.Debug(MsgFindMoreKey, openlogging.WithTags(openlogging.Tags{
"depth": opts.Depth,
- "k": opts.Key,
+ "key": opts.Key,
"labels": opts.Labels,
}))
- for cur.Next(ctx) {
- curKV := &model.KVDoc{}
-
- if err := cur.Decode(curKV); err != nil {
- openlogging.Error("decode to KVs error: " + err.Error())
- return nil, err
- }
- if (len(curKV.Labels) - len(opts.Labels)) > opts.Depth {
- //because it is query by labels, so result can not be minus
- //so many labels,then continue
- openlogging.Debug("so deep, skip this key")
- continue
- }
- openlogging.Debug(fmt.Sprintf("%v", curKV))
- var groupExist bool
- var labelGroup *model.KVResponse
- for _, labelGroup = range kvResp {
- if reflect.DeepEqual(labelGroup.LabelDoc.Labels, curKV.Labels) {
- groupExist = true
- clearAll(curKV)
- labelGroup.Data = append(labelGroup.Data, curKV)
- break
- }
-
- }
- if !groupExist {
- labelGroup = &model.KVResponse{
- LabelDoc: &model.LabelDocResponse{
- Labels: curKV.Labels,
- LabelID: curKV.LabelID,
- },
- Data: []*model.KVDoc{curKV},
- }
- clearAll(curKV)
- openlogging.Debug("add new label group")
- kvResp = append(kvResp, labelGroup)
- }
-
- }
- if len(kvResp) == 0 {
- return nil, service.ErrKeyNotExists
- }
- return kvResp, nil
-
+ return findMoreKV(ctx, cur, &opts)
}
diff --git a/server/service/mongo/kv/tool.go b/server/service/mongo/kv/tool.go
index e3b1bdb..cbeb5af 100644
--- a/server/service/mongo/kv/tool.go
+++ b/server/service/mongo/kv/tool.go
@@ -19,6 +19,7 @@
import (
"context"
+ "fmt"
"reflect"
"github.com/apache/servicecomb-kie/pkg/model"
@@ -68,3 +69,63 @@
}
return nil, service.ErrKeyNotExists
}
+
+func findKVByID(ctx context.Context, domain, project, kvID string) ([]*model.KVResponse, error) {
+ kvResp := make([]*model.KVResponse, 0)
+ kv, err := findKVDocByID(ctx, domain, project, kvID)
+ if err != nil {
+ return nil, err
+ }
+ kvResp = append(kvResp, &model.KVResponse{
+ Total: 1,
+ Data: []*model.KVDoc{kv},
+ })
+ return kvResp, nil
+}
+
+func findMoreKV(ctx context.Context, cur *mongo.Cursor, opts *service.FindOptions) ([]*model.KVResponse, error) {
+ kvResp := make([]*model.KVResponse, 0)
+ for cur.Next(ctx) {
+ curKV := &model.KVDoc{}
+
+ if err := cur.Decode(curKV); err != nil {
+ openlogging.Error("decode to KVs error: " + err.Error())
+ return nil, err
+ }
+ if (len(curKV.Labels) - len(opts.Labels)) > opts.Depth {
+ //because it is query by labels, so result can not be minus
+ //so many labels,then continue
+ openlogging.Debug("so deep, skip this key")
+ continue
+ }
+ openlogging.Debug(fmt.Sprintf("%v", curKV))
+ var groupExist bool
+ var labelGroup *model.KVResponse
+ for _, labelGroup = range kvResp {
+ if reflect.DeepEqual(labelGroup.LabelDoc.Labels, curKV.Labels) {
+ groupExist = true
+ clearAll(curKV)
+ labelGroup.Data = append(labelGroup.Data, curKV)
+ break
+ }
+
+ }
+ if !groupExist {
+ labelGroup = &model.KVResponse{
+ LabelDoc: &model.LabelDocResponse{
+ Labels: curKV.Labels,
+ LabelID: curKV.LabelID,
+ },
+ Data: []*model.KVDoc{curKV},
+ }
+ clearAll(curKV)
+ openlogging.Debug("add new label group")
+ kvResp = append(kvResp, labelGroup)
+ }
+
+ }
+ if len(kvResp) == 0 {
+ return nil, service.ErrKeyNotExists
+ }
+ return kvResp, nil
+}
diff --git a/server/service/options.go b/server/service/options.go
index bca3a90..859ea3d 100644
--- a/server/service/options.go
+++ b/server/service/options.go
@@ -34,6 +34,7 @@
ExactLabels bool
Status string
Depth int
+ ID string
Key string
Labels map[string]string
LabelID string
@@ -53,6 +54,13 @@
}
}
+//WithID find by kvID
+func WithID(id string) FindOption {
+ return func(o *FindOptions) {
+ o.ID = id
+ }
+}
+
//WithKey find by key
func WithKey(key string) FindOption {
return func(o *FindOptions) {