[feat] improve list interface's performance (#285)
* [feat] improve list interface's performance
* [feat] improve list interface's performance, add kvDoc cache layer
* [feat] improve list interface's performance, add kvDoc and kvId cache layer
* [feat] improve list interface's performance, modify review comments
* [feat] improve list interface's performance, format code
diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml
index cd29af6..b977042 100644
--- a/.github/workflows/golangci-lint.yml
+++ b/.github/workflows/golangci-lint.yml
@@ -15,7 +15,7 @@
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
- version: v1.48.0
+ version: v1.51.2
args: --enable gofmt,gocyclo,goimports,dupl,gosec --timeout 5m --skip-dirs=examples,test --skip-files=.*_test.go$
static-checks:
runs-on: ubuntu-latest
diff --git a/go.mod b/go.mod
index 43ef4cd..eb667ad 100644
--- a/go.mod
+++ b/go.mod
@@ -12,8 +12,10 @@
github.com/gofrs/uuid v4.0.0+incompatible
github.com/hashicorp/serf v0.9.5
github.com/little-cui/etcdadpt v0.3.2
+ github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.7.1
github.com/urfave/cli v1.22.4
+ go.etcd.io/etcd/api/v3 v3.5.0
go.mongodb.org/mongo-driver v1.5.1
gopkg.in/yaml.v2 v2.4.0
)
@@ -36,7 +38,7 @@
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
- github.com/fatih/color v1.9.0 // indirect
+ github.com/fatih/color v1.10.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-chassis/go-restful-swagger20 v1.0.4-0.20220704025524-9243cbee26b7 // indirect
@@ -70,9 +72,9 @@
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/karlseguin/ccache/v2 v2.0.8 // indirect
- github.com/klauspost/compress v1.9.5 // indirect
+ github.com/klauspost/compress v1.11.0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
- github.com/mattn/go-colorable v0.1.7 // indirect
+ github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/miekg/dns v1.1.26 // indirect
@@ -80,8 +82,8 @@
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
+ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/opentracing/opentracing-go v1.1.0 // indirect
- github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/posener/complete v1.2.3 // indirect
@@ -103,7 +105,6 @@
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.etcd.io/bbolt v1.3.6 // indirect
- go.etcd.io/etcd/api/v3 v3.5.0 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect
go.etcd.io/etcd/client/v2 v2.305.0 // indirect
go.etcd.io/etcd/client/v3 v3.5.0 // indirect
@@ -132,6 +133,7 @@
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
google.golang.org/grpc v1.40.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
+ gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/apimachinery v0.19.5 // indirect
diff --git a/go.sum b/go.sum
index 6691854..e67db89 100644
--- a/go.sum
+++ b/go.sum
@@ -151,8 +151,9 @@
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
-github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
+github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
+github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@@ -418,14 +419,14 @@
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/klauspost/compress v1.9.5 h1:U+CaK85mrNNb4k8BNOfgJtJ/gr6kswUCFj6miSzVC6M=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
+github.com/klauspost/compress v1.11.0 h1:wJbzvpYMVGG9iTI9VxpnNZfd4DzMPoCWze3GgSqz8yg=
+github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
-github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@@ -442,8 +443,8 @@
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
-github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3mdw=
-github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
+github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
+github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
@@ -479,6 +480,8 @@
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -1010,8 +1013,9 @@
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM=
diff --git a/server/datasource/etcd/init.go b/server/datasource/etcd/init.go
index b2f68b2..82e6187 100644
--- a/server/datasource/etcd/init.go
+++ b/server/datasource/etcd/init.go
@@ -31,6 +31,7 @@
}
func NewFrom(c *datasource.Config) (datasource.Broker, error) {
+ kv.Init()
return &Broker{}, nil
}
func (*Broker) GetRevisionDao() datasource.RevisionDao {
diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go
new file mode 100644
index 0000000..e3d47e6
--- /dev/null
+++ b/server/datasource/etcd/kv/kv_cache.go
@@ -0,0 +1,326 @@
+package kv
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "regexp"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/pkg/stringutil"
+ "github.com/apache/servicecomb-kie/server/datasource"
+ "github.com/apache/servicecomb-kie/server/datasource/etcd/key"
+ "github.com/go-chassis/foundation/backoff"
+ "github.com/go-chassis/openlog"
+ "github.com/little-cui/etcdadpt"
+ goCache "github.com/patrickmn/go-cache"
+ "go.etcd.io/etcd/api/v3/mvccpb"
+)
+
+func Init() {
+ kvCache = NewKvCache()
+ go kvCache.Refresh(context.Background())
+}
+
+var kvCache *Cache
+
+const (
+ prefixKvs = "kvs"
+ cacheExpirationTime = 10 * time.Minute
+ cacheCleanupInterval = 11 * time.Minute
+ etcdWatchTimeout = 1 * time.Hour
+ backOffMinInterval = 5 * time.Second
+)
+
+type IDSet map[string]struct{}
+
+type Cache struct {
+ timeOut time.Duration
+ client etcdadpt.Client
+ revision int64
+ kvIDCache sync.Map
+ kvDocCache *goCache.Cache
+}
+
+func NewKvCache() *Cache {
+ kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval)
+ return &Cache{
+ timeOut: etcdWatchTimeout,
+ client: etcdadpt.Instance(),
+ revision: 0,
+ kvDocCache: kvDocCache,
+ }
+}
+
+func Enabled() bool {
+ return kvCache != nil
+}
+
+type CacheSearchReq struct {
+ Domain string
+ Project string
+ Opts *datasource.FindOptions
+ Regex *regexp.Regexp
+}
+
+func (kc *Cache) Refresh(ctx context.Context) {
+ openlog.Info("start to list and watch")
+ retries := 0
+
+ timer := time.NewTimer(backOffMinInterval)
+ defer timer.Stop()
+ for {
+ nextPeriod := backOffMinInterval
+ if err := kc.listWatch(ctx); err != nil {
+ retries++
+ nextPeriod = backoff.GetBackoff().Delay(retries)
+ } else {
+ retries = 0
+ }
+
+ select {
+ case <-ctx.Done():
+ openlog.Info("stop to list and watch")
+ return
+ case <-timer.C:
+ timer.Reset(nextPeriod)
+ }
+ }
+}
+
+func (kc *Cache) listWatch(ctx context.Context) error {
+ rsp, err := kc.list(ctx)
+ if err != nil {
+ return err
+ }
+ kc.revision = rsp.Revision
+
+ kc.cachePut(rsp)
+
+ return kc.watch(ctx)
+}
+
+func (kc *Cache) watch(ctx context.Context) error {
+ timoutCtx, cancel := context.WithTimeout(ctx, kc.timeOut)
+ defer cancel()
+
+ rev := kc.revision
+ opts := append(
+ etcdadpt.WatchPrefixOpOptions(prefixKvs),
+ etcdadpt.WithRev(kc.revision+1),
+ etcdadpt.WithWatchCallback(kc.watchCallBack),
+ )
+ err := kc.client.Watch(timoutCtx, opts...)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("watch prefix %s failed, start rev: %d+1->%d->0, err %v", prefixKvs, rev, kc.revision, err))
+ kc.revision = 0
+ }
+ return err
+}
+
+func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) {
+ rsp, err := kc.client.Do(ctx, etcdadpt.WatchPrefixOpOptions(prefixKvs)...)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("list prefix %s failed, current rev: %d, err, %v", prefixKvs, kc.revision, err))
+ return nil, err
+ }
+ return rsp, nil
+}
+
+func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error {
+ if rsp == nil || len(rsp.Kvs) == 0 {
+ return fmt.Errorf("unknown event")
+ }
+ kc.revision = rsp.Revision
+
+ switch rsp.Action {
+ case etcdadpt.ActionPut:
+ kc.cachePut(rsp)
+ case etcdadpt.ActionDelete:
+ kc.cacheDelete(rsp)
+ default:
+ openlog.Warn(fmt.Sprintf("unrecognized action::%v", rsp.Action))
+ }
+ return nil
+}
+
+func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
+ for _, kv := range rsp.Kvs {
+ kvDoc, err := kc.GetKvDoc(kv)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
+ continue
+ }
+ kc.StoreKvDoc(kvDoc.ID, kvDoc)
+ cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
+ m, ok := kc.LoadKvIDSet(cacheKey)
+ if !ok {
+ kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}})
+ openlog.Info("cacheKey " + cacheKey + "not exists")
+ continue
+ }
+ m[kvDoc.ID] = struct{}{}
+ }
+}
+
+func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) {
+ for _, kv := range rsp.Kvs {
+ kvDoc, err := kc.GetKvDoc(kv)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
+ continue
+ }
+ kc.DeleteKvDoc(kvDoc.ID)
+ cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
+ m, ok := kc.LoadKvIDSet(cacheKey)
+ if !ok {
+ openlog.Error("cacheKey " + cacheKey + "not exists")
+ continue
+ }
+ delete(m, kvDoc.ID)
+ }
+}
+
+func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) {
+ val, ok := kc.kvIDCache.Load(cacheKey)
+ if !ok {
+ return nil, false
+ }
+ kvIds, ok := val.(IDSet)
+ if !ok {
+ return nil, false
+ }
+ return kvIds, true
+}
+
+func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) {
+ kc.kvIDCache.Store(cacheKey, kvIds)
+}
+
+func (kc *Cache) LoadKvDoc(kvID string) (*model.KVDoc, bool) {
+ val, ok := kc.kvDocCache.Get(kvID)
+ if !ok {
+ return nil, false
+ }
+ doc, ok := val.(*model.KVDoc)
+ if !ok {
+ return nil, false
+ }
+ return doc, true
+}
+
+func (kc *Cache) StoreKvDoc(kvID string, kvDoc *model.KVDoc) {
+ kc.kvDocCache.SetDefault(kvID, kvDoc)
+}
+
+func (kc *Cache) DeleteKvDoc(kvID string) {
+ kc.kvDocCache.Delete(kvID)
+}
+
+func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
+ if !req.Opts.ExactLabels {
+ return nil, false, nil
+ }
+
+ openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
+ result := &model.KVResponse{}
+ cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
+ kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
+ if !ok {
+ kvCache.StoreKvIDSet(cacheKey, IDSet{})
+ return result, true, nil
+ }
+
+ var docs []*model.KVDoc
+
+ var kvIdsLeft []string
+ for kvID := range kvIds {
+ if doc, ok := kvCache.LoadKvDoc(kvID); ok {
+ docs = append(docs, doc)
+ continue
+ }
+ kvIdsLeft = append(kvIdsLeft, kvID)
+ }
+
+ tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
+ docs = append(docs, tpData...)
+
+ for _, doc := range docs {
+ if isMatch(req, doc) {
+ datasource.ClearPart(doc)
+ result.Data = append(result.Data, doc)
+ }
+ }
+ result.Total = len(result.Data)
+ return result, true, nil
+}
+
+func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
+ if len(kvIdsLeft) == 0 {
+ return nil
+ }
+
+ openlog.Debug("get kv from etcd by kvId")
+ wg := sync.WaitGroup{}
+ docs := make([]*model.KVDoc, len(kvIdsLeft))
+ for i, kvID := range kvIdsLeft {
+ wg.Add(1)
+ go func(kvID string, cnt int) {
+ defer wg.Done()
+
+ docKey := key.KV(req.Domain, req.Project, kvID)
+ kv, err := etcdadpt.Get(ctx, docKey)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err))
+ return
+ }
+
+ doc, err := kc.GetKvDoc(kv)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
+ return
+ }
+
+ kc.StoreKvDoc(doc.ID, doc)
+ docs[cnt] = doc
+ }(kvID, i)
+ }
+ wg.Wait()
+ return docs
+}
+
+func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
+ if doc == nil {
+ return false
+ }
+ if req.Opts.Status != "" && doc.Status != req.Opts.Status {
+ return false
+ }
+ if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
+ return false
+ }
+ return true
+}
+
+func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) {
+ kvDoc := &model.KVDoc{}
+ err := json.Unmarshal(kv.Value, kvDoc)
+ if err != nil {
+ return nil, err
+ }
+ return kvDoc, nil
+}
+
+func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) string {
+ labelFormat := stringutil.FormatMap(labels)
+ inputKey := strings.Join([]string{
+ "",
+ domain,
+ project,
+ labelFormat,
+ }, "/")
+ return inputKey
+}
diff --git a/server/datasource/etcd/kv/kv_cache_test.go b/server/datasource/etcd/kv/kv_cache_test.go
new file mode 100644
index 0000000..2286699
--- /dev/null
+++ b/server/datasource/etcd/kv/kv_cache_test.go
@@ -0,0 +1,98 @@
+package kv
+
+import (
+ "testing"
+
+ "github.com/little-cui/etcdadpt"
+ "github.com/stretchr/testify/assert"
+ "go.etcd.io/etcd/api/v3/mvccpb"
+)
+
+type args struct {
+ rsp *etcdadpt.Response
+}
+
+func TestCachePut(t *testing.T) {
+ tests := []struct {
+ name string
+ args args
+ want int
+ }{
+ {"put 0 kvDoc, cache should store 0 kvDoc",
+ args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}},
+ 0,
+ },
+ {"put 1 kvDoc, cache should store 1 kvDoc",
+ args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
+ }}},
+ 1,
+ },
+ {"put 2 kvDocs with different kvIds, cache should store 2 kvDocs",
+ args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
+ {Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
+ }}},
+ 2,
+ },
+ {"put 2 kvDocs with same kvId, cache should store 1 kvDocs",
+ args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
+ {Value: []byte(`{"id":"1", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
+ }}},
+ 1,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ kc := NewKvCache()
+ kc.cachePut(tt.args.rsp)
+ num := kc.kvDocCache.ItemCount()
+ assert.Equal(t, tt.want, num)
+ })
+ }
+}
+
+func TestCacheDelete(t *testing.T) {
+ tests := []struct {
+ name string
+ args args
+ want int
+ }{
+ {"first put 2 kvDocs, then delete 0 kvDoc, cache should store 2 kvDocs",
+ args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}}},
+ 2,
+ },
+ {"first put 2 kvDocs, then delete kvId=1, cache should store 1 kvDocs",
+ args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{
+ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
+ }}},
+ 1,
+ },
+ {"first put 2 kvDocs, then delete kvId=1 and kvId=2, cache should store 0 kvDocs",
+ args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
+ {Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
+ }}},
+ 0,
+ },
+ {"first put 2 kvDocs, then delete non-exist kvId=0, cache should store 2 kvDocs",
+ args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {Value: []byte(`{"id":"0", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
+ }}},
+ 2,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ kc := NewKvCache()
+ kc.cachePut(&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
+ {Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
+ }})
+ kc.cacheDelete(tt.args.rsp)
+ num := kc.kvDocCache.ItemCount()
+ assert.Equal(t, tt.want, num)
+ })
+ }
+}
diff --git a/server/datasource/etcd/kv/kv_dao.go b/server/datasource/etcd/kv/kv_dao.go
index 8fb15c3..6c1745d 100644
--- a/server/datasource/etcd/kv/kv_dao.go
+++ b/server/datasource/etcd/kv/kv_dao.go
@@ -23,15 +23,14 @@
"regexp"
"strings"
- "github.com/go-chassis/cari/sync"
- "github.com/go-chassis/openlog"
- "github.com/little-cui/etcdadpt"
-
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/util"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/auth"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
+ "github.com/go-chassis/cari/sync"
+ "github.com/go-chassis/openlog"
+ "github.com/little-cui/etcdadpt"
)
// Dao operate data in mongodb
@@ -522,12 +521,37 @@
if err != nil {
return nil, opts, err
}
- // TODO may be OOM
- kvs, _, err := etcdadpt.List(ctx, key.KVList(domain, project))
+
+ if Enabled() {
+ result, useCache, err := Search(ctx, &CacheSearchReq{
+ Domain: domain,
+ Project: project,
+ Opts: &opts,
+ Regex: regex,
+ })
+ if useCache && err == nil {
+ return result, opts, nil
+ }
+ if useCache && err != nil {
+ openlog.Error("using cache to search kv failed: " + err.Error())
+ }
+ }
+
+ result, err := matchLabelsSearch(ctx, domain, project, regex, opts)
if err != nil {
openlog.Error("list kv failed: " + err.Error())
return nil, opts, err
}
+
+ return result, opts, nil
+}
+
+func matchLabelsSearch(ctx context.Context, domain, project string, regex *regexp.Regexp, opts datasource.FindOptions) (*model.KVResponse, error) {
+ openlog.Debug("using labels to search kv")
+ kvs, _, err := etcdadpt.List(ctx, key.KVList(domain, project))
+ if err != nil {
+ return nil, err
+ }
result := &model.KVResponse{
Data: []*model.KVDoc{},
}
@@ -551,7 +575,7 @@
}
}
- return result, opts, nil
+ return result, nil
}
func IsUniqueFind(opts datasource.FindOptions) bool {
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index b98030d..b5bc2cb 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -97,7 +97,14 @@
kv.ValueType = datasource.DefaultValueType
}
//check whether the project has certain labels or not
- exist, err := datasource.GetBroker().GetKVDao().Exist(ctx, kv.Key, kv.Project, kv.Domain, datasource.WithLabelFormat(kv.LabelFormat))
+ exist, err := datasource.GetBroker().GetKVDao().Exist(
+ ctx,
+ kv.Key,
+ kv.Project,
+ kv.Domain,
+ datasource.WithLabelFormat(kv.LabelFormat),
+ datasource.WithLabels(kv.Labels),
+ )
if err != nil {
openlog.Error(err.Error())
return nil, config.NewError(config.ErrInternal, "create kv failed")
@@ -311,7 +318,14 @@
labels = map[string]string{}
}
labelFormat := stringutil.FormatMap(labels)
- exist, err := datasource.GetBroker().GetKVDao().Exist(ctx, key, project, domain, datasource.WithLabelFormat(labelFormat))
+ exist, err := datasource.GetBroker().GetKVDao().Exist(
+ ctx,
+ key,
+ project,
+ domain,
+ datasource.WithLabelFormat(labelFormat),
+ datasource.WithLabels(labels),
+ )
if err != nil {
openlog.Error(err.Error())
return false, err
@@ -324,7 +338,14 @@
labels = map[string]string{}
}
labelFormat := stringutil.FormatMap(labels)
- kvs, err := datasource.GetBroker().GetKVDao().GetByKey(ctx, key, project, domain, datasource.WithLabelFormat(labelFormat))
+ kvs, err := datasource.GetBroker().GetKVDao().GetByKey(
+ ctx,
+ key,
+ project,
+ domain,
+ datasource.WithLabelFormat(labelFormat),
+ datasource.WithLabels(labels),
+ )
if err != nil {
openlog.Error(err.Error())
return nil, err