[merge]merge dev to master (#317)
* fix the concurrent bug of KvIdCache
* fix the bug of do not report the error which occured in action of get kvdocs from etcd
* fix the bug of do not report the error which occured in action of get kvdocs from etcd
* resolve conflicts in master
* [fix] fix golangci-lint (#318)
Co-authored-by: songshiyuan 00649746 <songshiyuan3@huawei.com>
* fix the concurrent bug of KvIdCache
* fix the bug of do not report the error which occured in action of get kvdocs from etcd
* fix the bug of do not report the error which occured in action of get kvdocs from etcd
* resolve conflicts in master
---------
Co-authored-by: songshiyuan 00649746 <songshiyuan3@huawei.com>
diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go
index 61d017b..7cf6529 100644
--- a/server/datasource/etcd/kv/kv_cache.go
+++ b/server/datasource/etcd/kv/kv_cache.go
@@ -9,15 +9,16 @@
"sync"
"time"
- "github.com/apache/servicecomb-kie/pkg/model"
- "github.com/apache/servicecomb-kie/pkg/stringutil"
- "github.com/apache/servicecomb-kie/server/datasource"
- "github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
"github.com/go-chassis/openlog"
"github.com/little-cui/etcdadpt"
goCache "github.com/patrickmn/go-cache"
"go.etcd.io/etcd/api/v3/mvccpb"
+
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/pkg/stringutil"
+ "github.com/apache/servicecomb-kie/server/datasource"
+ "github.com/apache/servicecomb-kie/server/datasource/etcd/key"
)
func Init() {
@@ -35,8 +36,6 @@
backOffMinInterval = 5 * time.Second
)
-type IDSet map[string]struct{}
-
type Cache struct {
timeOut time.Duration
client etcdadpt.Client
@@ -158,11 +157,13 @@
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
if !ok {
- kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}})
+ z := &sync.Map{}
+ z.Store(kvDoc.ID, struct{}{})
+ kc.StoreKvIDSet(cacheKey, z)
openlog.Info("cacheKey " + cacheKey + "not exists")
continue
}
- m[kvDoc.ID] = struct{}{}
+ m.Store(kvDoc.ID, struct{}{})
}
}
@@ -180,23 +181,23 @@
openlog.Error("cacheKey " + cacheKey + "not exists")
continue
}
- delete(m, kvDoc.ID)
+ m.Delete(kvDoc.ID)
}
}
-func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) {
+func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) {
val, ok := kc.kvIDCache.Load(cacheKey)
if !ok {
return nil, false
}
- kvIds, ok := val.(IDSet)
+ kvIds, ok := val.(*sync.Map)
if !ok {
return nil, false
}
return kvIds, true
}
-func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) {
+func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) {
kc.kvIDCache.Store(cacheKey, kvIds)
}
@@ -220,9 +221,9 @@
kc.kvDocCache.Delete(kvID)
}
-func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) {
+func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
if !req.Opts.ExactLabels {
- return nil, false
+ return nil, false, nil
}
openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
@@ -232,22 +233,25 @@
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
- kvCache.StoreKvIDSet(cacheKey, IDSet{})
- return result, true
+ kvCache.StoreKvIDSet(cacheKey, &sync.Map{})
+ return result, true, nil
}
var docs []*model.KVDoc
var kvIdsLeft []string
- for kvID := range kvIds {
- if doc, ok := kvCache.LoadKvDoc(kvID); ok {
+ kvIds.Range(func(kvID, value any) bool {
+ if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok {
docs = append(docs, doc)
- continue
+ } else {
+ kvIdsLeft = append(kvIdsLeft, kvID.(string))
}
- kvIdsLeft = append(kvIdsLeft, kvID)
+ return true
+ })
+ tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
+ if err != nil {
+ return nil, true, err
}
-
- tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)
for _, doc := range docs {
@@ -257,17 +261,18 @@
}
}
result.Total = len(result.Data)
- return result, true
+ return result, true, nil
}
-func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
+func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) ([]*model.KVDoc, error) {
if len(kvIdsLeft) == 0 {
- return nil
+ return nil, nil
}
openlog.Debug("get kv from etcd by kvId")
wg := sync.WaitGroup{}
docs := make([]*model.KVDoc, len(kvIdsLeft))
+ var getKvErr error
for i, kvID := range kvIdsLeft {
wg.Add(1)
go func(kvID string, cnt int) {
@@ -277,12 +282,14 @@
kv, err := etcdadpt.Get(ctx, docKey)
if err != nil {
openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err))
+ getKvErr = err
return
}
doc, err := kc.GetKvDoc(kv)
if err != nil {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
+ getKvErr = err
return
}
@@ -291,7 +298,10 @@
}(kvID, i)
}
wg.Wait()
- return docs
+ if getKvErr != nil {
+ return nil, getKvErr
+ }
+ return docs, nil
}
func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
diff --git a/server/datasource/etcd/kv/kv_dao.go b/server/datasource/etcd/kv/kv_dao.go
index 2332b58..d7260df 100644
--- a/server/datasource/etcd/kv/kv_dao.go
+++ b/server/datasource/etcd/kv/kv_dao.go
@@ -524,15 +524,18 @@
}
if Enabled() {
- result, useCache := Search(ctx, &CacheSearchReq{
+ result, useCache, err := Search(ctx, &CacheSearchReq{
Domain: domain,
Project: project,
Opts: &opts,
Regex: regex,
})
- if useCache {
+ if useCache && err == nil {
return result, opts, nil
}
+ if useCache && err != nil {
+ openlog.Error("using cache to search kv failed: " + err.Error())
+ }
}
result, err := matchLabelsSearch(ctx, domain, project, regex, opts)