SCB-993 search sc cluster when key mismatch with aggregator cache (#493)
* SCB-993 search sc cluster when key mismatch with aggregator cache
* SCB-993 New revision mechanism
* SCB-993 Add global flag in query api
* SCB-993 Prevent aggregator to enter infinite recursion
* SCB-993 Optimize codes
diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go
index 331666e..bfd337e 100644
--- a/pkg/cache/cache_test.go
+++ b/pkg/cache/cache_test.go
@@ -26,12 +26,12 @@
type level1 struct {
}
-func (l *level1) Name(ctx context.Context) string {
+func (l *level1) Name(ctx context.Context, _ *Node) string {
return ctx.Value("key1").(string)
}
func (l *level1) Init(ctx context.Context, parent *Node) (node *Node, err error) {
- p := l.Name(ctx)
+ p := l.Name(ctx, parent)
if p == "err" {
return nil, fmt.Errorf("wrong logic")
}
@@ -51,7 +51,7 @@
changed string
}
-func (l *level2) Name(ctx context.Context) string {
+func (l *level2) Name(ctx context.Context, _ *Node) string {
return ctx.Value("key2").(string)
}
@@ -60,7 +60,7 @@
return
}
- p := l.Name(ctx)
+ p := l.Name(ctx, parent)
if p == "err" {
return nil, fmt.Errorf("wrong logic")
}
diff --git a/pkg/cache/filter.go b/pkg/cache/filter.go
index 51c6448..9652fcf 100644
--- a/pkg/cache/filter.go
+++ b/pkg/cache/filter.go
@@ -19,6 +19,6 @@
import "golang.org/x/net/context"
type Filter interface {
- Name(ctx context.Context) string
+ Name(ctx context.Context, parent *Node) string
Init(ctx context.Context, parent *Node) (*Node, error)
}
diff --git a/pkg/cache/tree.go b/pkg/cache/tree.go
index 9c89df0..d9adcf5 100644
--- a/pkg/cache/tree.go
+++ b/pkg/cache/tree.go
@@ -77,7 +77,7 @@
return
}
- t.roots.Delete(t.filters[0].Name(ctx))
+ t.roots.Delete(t.filters[0].Name(ctx, nil))
}
func (t *Tree) getOrCreateRoot(ctx context.Context) (node *Node, err error) {
@@ -86,7 +86,7 @@
}
filter := t.filters[0]
- name := filter.Name(ctx)
+ name := filter.Name(ctx, nil)
item, err := t.roots.Fetch(name, t.Config.TTL(), func() (interface{}, error) {
node, err := t.getOrCreateNode(ctx, 0, nil)
if err != nil {
@@ -108,7 +108,7 @@
func (t *Tree) getOrCreateNode(ctx context.Context, idx int, parent *Node) (node *Node, err error) {
filter := t.filters[idx]
- name := t.nodeFullName(filter.Name(ctx), parent)
+ name := t.nodeFullName(filter.Name(ctx, parent), parent)
if parent == nil {
// new a temp node
diff --git a/pkg/client/sc/apis.go b/pkg/client/sc/apis.go
index e0103af..3fb8f1a 100644
--- a/pkg/client/sc/apis.go
+++ b/pkg/client/sc/apis.go
@@ -25,17 +25,22 @@
scerr "github.com/apache/servicecomb-service-center/server/error"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
"github.com/apache/servicecomb-service-center/version"
+ "golang.org/x/net/context"
"io/ioutil"
"net/http"
)
const (
- apiVersionURL = "/version"
- apiDumpURL = "/v4/default/admin/dump"
- apiClustersURL = "/v4/default/admin/clusters"
- apiHealthURL = "/v4/default/registry/health"
- apiSchemasURL = "/v4/%s/registry/microservices/%s/schemas"
- apiSchemaURL = "/v4/%s/registry/microservices/%s/schemas/%s"
+ apiVersionURL = "/version"
+ apiDumpURL = "/v4/default/admin/dump"
+ apiClustersURL = "/v4/default/admin/clusters"
+ apiHealthURL = "/v4/default/registry/health"
+ apiSchemasURL = "/v4/%s/registry/microservices/%s/schemas"
+ apiSchemaURL = "/v4/%s/registry/microservices/%s/schemas/%s"
+ apiInstancesURL = "/v4/%s/registry/microservices/%s/instances"
+ apiInstanceURL = "/v4/%s/registry/microservices/%s/instances/%s"
+
+ QueryGlobal = "global"
)
func (c *SCClient) toError(body []byte) *scerr.Error {
@@ -47,8 +52,18 @@
return message
}
-func (c *SCClient) GetScVersion() (*version.VersionSet, *scerr.Error) {
- resp, err := c.RestDo(http.MethodGet, apiVersionURL, c.CommonHeaders(), nil)
+func (c *SCClient) parseQuery(ctx context.Context) (q string) {
+ switch {
+ case ctx.Value(QueryGlobal) == "1":
+ q += "global=true"
+ default:
+ q += "global=false"
+ }
+ return
+}
+
+func (c *SCClient) GetScVersion(ctx context.Context) (*version.VersionSet, *scerr.Error) {
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiVersionURL, c.CommonHeaders(ctx), nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
@@ -66,18 +81,17 @@
v := &version.VersionSet{}
err = json.Unmarshal(body, v)
if err != nil {
- fmt.Println(string(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
return v, nil
}
-func (c *SCClient) GetScCache() (*model.Cache, *scerr.Error) {
- headers := c.CommonHeaders()
+func (c *SCClient) GetScCache(ctx context.Context) (*model.Cache, *scerr.Error) {
+ headers := c.CommonHeaders(ctx)
// only default domain has admin permission
headers.Set("X-Domain-Name", "default")
- resp, err := c.RestDo(http.MethodGet, apiDumpURL, headers, nil)
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiDumpURL, headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
@@ -95,19 +109,18 @@
dump := &model.DumpResponse{}
err = json.Unmarshal(body, dump)
if err != nil {
- fmt.Println(string(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
return dump.Cache, nil
}
-func (c *SCClient) GetSchemasByServiceId(domainProject, serviceId string) ([]*pb.Schema, *scerr.Error) {
+func (c *SCClient) GetSchemasByServiceId(ctx context.Context, domainProject, serviceId string) ([]*pb.Schema, *scerr.Error) {
domain, project := core.FromDomainProject(domainProject)
- headers := c.CommonHeaders()
+ headers := c.CommonHeaders(ctx)
headers.Set("X-Domain-Name", domain)
- resp, err := c.RestDo(http.MethodGet,
- fmt.Sprintf(apiSchemasURL, project, serviceId)+"?withSchema=1",
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet,
+ fmt.Sprintf(apiSchemasURL, project, serviceId)+"?withSchema=1&"+c.parseQuery(ctx),
headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
@@ -126,19 +139,18 @@
schemas := &pb.GetAllSchemaResponse{}
err = json.Unmarshal(body, schemas)
if err != nil {
- fmt.Println(util.BytesToStringWithNoCopy(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
return schemas.Schemas, nil
}
-func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId string) (*pb.Schema, *scerr.Error) {
+func (c *SCClient) GetSchemaBySchemaId(ctx context.Context, domainProject, serviceId, schemaId string) (*pb.Schema, *scerr.Error) {
domain, project := core.FromDomainProject(domainProject)
- headers := c.CommonHeaders()
+ headers := c.CommonHeaders(ctx)
headers.Set("X-Domain-Name", domain)
- resp, err := c.RestDo(http.MethodGet,
- fmt.Sprintf(apiSchemaURL, project, serviceId, schemaId),
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet,
+ fmt.Sprintf(apiSchemaURL, project, serviceId, schemaId)+"?"+c.parseQuery(ctx),
headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
@@ -157,7 +169,6 @@
schema := &pb.GetSchemaResponse{}
err = json.Unmarshal(body, schema)
if err != nil {
- fmt.Println(util.BytesToStringWithNoCopy(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
@@ -168,11 +179,11 @@
}, nil
}
-func (c *SCClient) GetClusters() (registry.Clusters, *scerr.Error) {
- headers := c.CommonHeaders()
+func (c *SCClient) GetClusters(ctx context.Context) (registry.Clusters, *scerr.Error) {
+ headers := c.CommonHeaders(ctx)
// only default domain has admin permission
headers.Set("X-Domain-Name", "default")
- resp, err := c.RestDo(http.MethodGet, apiClustersURL, headers, nil)
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiClustersURL, headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
@@ -190,18 +201,17 @@
clusters := &model.ClustersResponse{}
err = json.Unmarshal(body, clusters)
if err != nil {
- fmt.Println(string(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
return clusters.Clusters, nil
}
-func (c *SCClient) HealthCheck() *scerr.Error {
- headers := c.CommonHeaders()
+func (c *SCClient) HealthCheck(ctx context.Context) *scerr.Error {
+ headers := c.CommonHeaders(ctx)
// only default domain has admin permission
headers.Set("X-Domain-Name", "default")
- resp, err := c.RestDo(http.MethodGet, apiHealthURL, headers, nil)
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiHealthURL, headers, nil)
if err != nil {
return scerr.NewError(scerr.ErrUnavailableBackend, err.Error())
}
@@ -217,3 +227,65 @@
}
return nil
}
+
+func (c *SCClient) GetInstancesByServiceId(ctx context.Context, domainProject, providerId, consumerId string) ([]*pb.MicroServiceInstance, *scerr.Error) {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+ headers.Set("X-ConsumerId", consumerId)
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet,
+ fmt.Sprintf(apiInstancesURL, project, providerId)+"?"+c.parseQuery(ctx),
+ headers, nil)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, c.toError(body)
+ }
+
+ instancesResp := &pb.GetInstancesResponse{}
+ err = json.Unmarshal(body, instancesResp)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ return instancesResp.Instances, nil
+}
+
+func (c *SCClient) GetInstanceByInstanceId(ctx context.Context, domainProject, providerId, instanceId, consumerId string) (*pb.MicroServiceInstance, *scerr.Error) {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+ headers.Set("X-ConsumerId", consumerId)
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet,
+ fmt.Sprintf(apiInstanceURL, project, providerId, instanceId)+"?"+c.parseQuery(ctx),
+ headers, nil)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, c.toError(body)
+ }
+
+ instanceResp := &pb.GetOneInstanceResponse{}
+ err = json.Unmarshal(body, instanceResp)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ return instanceResp.Instance, nil
+}
diff --git a/pkg/client/sc/client.go b/pkg/client/sc/client.go
index 18ee9db..570d5a2 100644
--- a/pkg/client/sc/client.go
+++ b/pkg/client/sc/client.go
@@ -16,6 +16,7 @@
package sc
import (
+ "golang.org/x/net/context"
"net/http"
)
@@ -32,8 +33,9 @@
Cfg Config
}
-func (c *SCClient) CommonHeaders() http.Header {
+func (c *SCClient) CommonHeaders(ctx context.Context) http.Header {
var headers = make(http.Header)
+ // TODO overwrote by context values
if len(c.Cfg.Token) > 0 {
headers.Set("X-Auth-Token", c.Cfg.Token)
}
diff --git a/pkg/client/sc/client_lb.go b/pkg/client/sc/client_lb.go
index d66b15a..914b902 100644
--- a/pkg/client/sc/client_lb.go
+++ b/pkg/client/sc/client_lb.go
@@ -19,6 +19,7 @@
"github.com/apache/servicecomb-service-center/pkg/lb"
"github.com/apache/servicecomb-service-center/pkg/rest"
"github.com/apache/servicecomb-service-center/pkg/util"
+ "golang.org/x/net/context"
"net/http"
)
@@ -44,9 +45,9 @@
return c.LB.Next()
}
-func (c *LBClient) RestDo(method string, api string, headers http.Header, body []byte) (resp *http.Response, err error) {
+func (c *LBClient) RestDoWithContext(ctx context.Context, method string, api string, headers http.Header, body []byte) (resp *http.Response, err error) {
for i := 0; i < c.Retries; i++ {
- resp, err = c.HttpDo(method, c.Next()+api, headers, body)
+ resp, err = c.HttpDoWithContext(ctx, method, c.Next()+api, headers, body)
if err != nil {
util.GetBackoff().Delay(i)
continue
diff --git a/pkg/rest/client.go b/pkg/rest/client.go
index 495cf86..4a2d2f6 100644
--- a/pkg/rest/client.go
+++ b/pkg/rest/client.go
@@ -22,6 +22,7 @@
"fmt"
"github.com/apache/servicecomb-service-center/pkg/tlsutil"
"github.com/apache/servicecomb-service-center/pkg/util"
+ "golang.org/x/net/context"
"net/http"
"net/url"
"os"
@@ -64,7 +65,7 @@
Cfg URLClientOption
}
-func (client *URLClient) HttpDo(method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) {
+func (client *URLClient) HttpDoWithContext(ctx context.Context, method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) {
if strings.HasPrefix(rawURL, "https") {
if transport, ok := client.Client.Transport.(*http.Transport); ok {
transport.TLSClientConfig = client.TLS
@@ -93,6 +94,7 @@
if err != nil {
return nil, errors.New(fmt.Sprintf("create request failed: %s", err.Error()))
}
+ req = req.WithContext(ctx)
req.Header = headers
resp, err = client.Client.Do(req)
@@ -122,6 +124,10 @@
return resp, nil
}
+func (client *URLClient) HttpDo(method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) {
+ return client.HttpDoWithContext(context.Background(), method, rawURL, headers, body)
+}
+
func setOptionDefaultValue(o *URLClientOption) URLClientOption {
if o == nil {
return defaultURLClientOption
diff --git a/pkg/util/util.go b/pkg/util/util.go
index c89f9d5..d824edf 100644
--- a/pkg/util/util.go
+++ b/pkg/util/util.go
@@ -147,3 +147,11 @@
}
timer.Reset(d)
}
+
+func StringTRUE(s string) bool {
+ s = strings.ToLower(strings.TrimSpace(s))
+ if s == "1" || s == "true" {
+ return true
+ }
+ return false
+}
diff --git a/scctl/pkg/plugin/diagnose/diagnose.go b/scctl/pkg/plugin/diagnose/diagnose.go
index 7d50126..b3e2833 100644
--- a/scctl/pkg/plugin/diagnose/diagnose.go
+++ b/scctl/pkg/plugin/diagnose/diagnose.go
@@ -65,7 +65,7 @@
}
// query sc
- cache, scErr := scClient.GetScCache()
+ cache, scErr := scClient.GetScCache(context.Background())
if scErr != nil {
cmd.StopAndExit(cmd.ExitError, scErr)
}
diff --git a/scctl/pkg/plugin/get/cluster/cluster_cmd.go b/scctl/pkg/plugin/get/cluster/cluster_cmd.go
index c3d2327..012c7ba 100644
--- a/scctl/pkg/plugin/get/cluster/cluster_cmd.go
+++ b/scctl/pkg/plugin/get/cluster/cluster_cmd.go
@@ -21,6 +21,7 @@
"github.com/apache/servicecomb-service-center/scctl/pkg/plugin/get"
"github.com/apache/servicecomb-service-center/scctl/pkg/writer"
"github.com/spf13/cobra"
+ "golang.org/x/net/context"
)
func init() {
@@ -43,7 +44,7 @@
if err != nil {
cmd.StopAndExit(cmd.ExitError, err)
}
- clusters, scErr := scClient.GetClusters()
+ clusters, scErr := scClient.GetClusters(context.Background())
if scErr != nil {
cmd.StopAndExit(cmd.ExitError, scErr)
}
diff --git a/scctl/pkg/plugin/get/instance/instance_cmd.go b/scctl/pkg/plugin/get/instance/instance_cmd.go
index e698ada..4161e8c 100644
--- a/scctl/pkg/plugin/get/instance/instance_cmd.go
+++ b/scctl/pkg/plugin/get/instance/instance_cmd.go
@@ -24,6 +24,7 @@
admin "github.com/apache/servicecomb-service-center/server/admin/model"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/spf13/cobra"
+ "golang.org/x/net/context"
"strings"
)
@@ -47,7 +48,7 @@
if err != nil {
cmd.StopAndExit(cmd.ExitError, err)
}
- cache, scErr := scClient.GetScCache()
+ cache, scErr := scClient.GetScCache(context.Background())
if scErr != nil {
cmd.StopAndExit(cmd.ExitError, scErr)
}
diff --git a/scctl/pkg/plugin/get/schema/schema_cmd.go b/scctl/pkg/plugin/get/schema/schema_cmd.go
index 97f9c7b..412dc81 100644
--- a/scctl/pkg/plugin/get/schema/schema_cmd.go
+++ b/scctl/pkg/plugin/get/schema/schema_cmd.go
@@ -25,6 +25,7 @@
adminModel "github.com/apache/servicecomb-service-center/server/admin/model"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/spf13/cobra"
+ "golang.org/x/net/context"
"io"
"io/ioutil"
"os"
@@ -80,7 +81,7 @@
if err != nil {
cmd.StopAndExit(cmd.ExitError, err)
}
- cache, scErr := scClient.GetScCache()
+ cache, scErr := scClient.GetScCache(context.Background())
if scErr != nil {
cmd.StopAndExit(cmd.ExitError, scErr)
}
@@ -109,7 +110,7 @@
continue
}
- schemas, err := scClient.GetSchemasByServiceId(domainProject, ms.Value.ServiceId)
+ schemas, err := scClient.GetSchemasByServiceId(context.Background(), domainProject, ms.Value.ServiceId)
if err != nil {
cmd.StopAndExit(cmd.ExitError, err)
}
diff --git a/scctl/pkg/plugin/get/service/service_cmd.go b/scctl/pkg/plugin/get/service/service_cmd.go
index 70df59b..3c63f41 100644
--- a/scctl/pkg/plugin/get/service/service_cmd.go
+++ b/scctl/pkg/plugin/get/service/service_cmd.go
@@ -24,6 +24,7 @@
"github.com/apache/servicecomb-service-center/scctl/pkg/writer"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/spf13/cobra"
+ "golang.org/x/net/context"
"strings"
)
@@ -48,7 +49,7 @@
if err != nil {
cmd.StopAndExit(cmd.ExitError, err)
}
- cache, scErr := scClient.GetScCache()
+ cache, scErr := scClient.GetScCache(context.Background())
if scErr != nil {
cmd.StopAndExit(cmd.ExitError, scErr)
}
diff --git a/scctl/pkg/plugin/health/cmd.go b/scctl/pkg/plugin/health/cmd.go
index c932605..c9e5c31 100644
--- a/scctl/pkg/plugin/health/cmd.go
+++ b/scctl/pkg/plugin/health/cmd.go
@@ -20,6 +20,7 @@
"github.com/apache/servicecomb-service-center/scctl/pkg/cmd"
scerr "github.com/apache/servicecomb-service-center/server/error"
"github.com/spf13/cobra"
+ "golang.org/x/net/context"
)
const (
@@ -48,7 +49,7 @@
if err != nil {
cmd.StopAndExit(ExistInternal, err)
}
- scErr := scClient.HealthCheck()
+ scErr := scClient.HealthCheck(context.Background())
if scErr != nil {
switch scErr.Code {
case scerr.ErrUnavailableBackend:
diff --git a/scctl/pkg/plugin/version/cmd.go b/scctl/pkg/plugin/version/cmd.go
index cba9172..877d23b 100644
--- a/scctl/pkg/plugin/version/cmd.go
+++ b/scctl/pkg/plugin/version/cmd.go
@@ -21,6 +21,7 @@
"github.com/apache/servicecomb-service-center/scctl/pkg/cmd"
"github.com/apache/servicecomb-service-center/scctl/pkg/version"
"github.com/spf13/cobra"
+ "golang.org/x/net/context"
)
var (
@@ -51,7 +52,7 @@
if err != nil {
return
}
- v, err := scClient.GetScVersion()
+ v, err := scClient.GetScVersion(context.Background())
if err != nil {
return
}
diff --git a/server/handler/cache/cache.go b/server/handler/cache/cache.go
index 4bb6fd5..adc3e66 100644
--- a/server/handler/cache/cache.go
+++ b/server/handler/cache/cache.go
@@ -19,6 +19,7 @@
import (
"github.com/apache/servicecomb-service-center/pkg/chain"
"github.com/apache/servicecomb-service-center/pkg/rest"
+ "github.com/apache/servicecomb-service-center/pkg/util"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
"net/http"
)
@@ -32,17 +33,18 @@
r := i.Context().Value(rest.CTX_REQUEST).(*http.Request)
query := r.URL.Query()
- if r.Method != http.MethodGet {
- i.WithContext(serviceUtil.CTX_REGISTRYONLY, "1")
+ global := util.StringTRUE(query.Get(serviceUtil.CTX_GLOBAL))
+ if global && r.Method == http.MethodGet {
+ i.WithContext(serviceUtil.CTX_GLOBAL, "1")
}
- noCache := query.Get(serviceUtil.CTX_NOCACHE) == "1"
+ noCache := util.StringTRUE(query.Get(serviceUtil.CTX_NOCACHE))
if noCache {
i.WithContext(serviceUtil.CTX_NOCACHE, "1")
return
}
- cacheOnly := query.Get(serviceUtil.CTX_CACHEONLY) == "1"
+ cacheOnly := util.StringTRUE(query.Get(serviceUtil.CTX_CACHEONLY))
if cacheOnly {
i.WithContext(serviceUtil.CTX_CACHEONLY, "1")
return
diff --git a/server/plugin/pkg/discovery/aggregate/indexer.go b/server/plugin/pkg/discovery/aggregate/indexer.go
index 912c997..b524a69 100644
--- a/server/plugin/pkg/discovery/aggregate/indexer.go
+++ b/server/plugin/pkg/discovery/aggregate/indexer.go
@@ -17,7 +17,6 @@
import (
"github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/apache/servicecomb-service-center/server/core/backend"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
"golang.org/x/net/context"
@@ -42,9 +41,9 @@
if _, ok := exists[key]; !ok {
exists[key] = struct{}{}
response.Kvs = append(response.Kvs, kv)
- response.Count += 1
}
}
+ response.Count += resp.Count
}
return &response, nil
}
@@ -54,31 +53,48 @@
}
type AggregatorIndexer struct {
- Indexer discovery.Indexer
- Registry discovery.Indexer
+ *discovery.CacheIndexer
+ AdaptorsIndexer discovery.Indexer
+ LocalIndexer discovery.Indexer
}
-func (i *AggregatorIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (*discovery.Response, error) {
- op := registry.OptionsToOp(opts...)
- if op.RegistryOnly {
- return i.Registry.Search(ctx, opts...)
+func (i *AggregatorIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (resp *discovery.Response, err error) {
+ op := registry.OpGet(opts...)
+
+ if op.NoCache() || !op.Global {
+ return i.search(ctx, opts...)
}
- return i.Indexer.Search(ctx, opts...)
+ resp, err = i.CacheIndexer.Search(ctx, opts...)
+ if err != nil {
+ return
+ }
+
+ if resp.Count > 0 || op.CacheOnly() {
+ return resp, nil
+ }
+
+ return i.search(ctx, opts...)
+}
+
+func (i *AggregatorIndexer) search(ctx context.Context, opts ...registry.PluginOpOption) (*discovery.Response, error) {
+ op := registry.OptionsToOp(opts...)
+ if !op.Global {
+ return i.LocalIndexer.Search(ctx, opts...)
+ }
+
+ return i.AdaptorsIndexer.Search(ctx, opts...)
}
func NewAggregatorIndexer(as *Aggregator) *AggregatorIndexer {
- ai := &AggregatorIndexer{}
- switch as.Type {
- case backend.SCHEMA:
- // schema does not been cached
- ai.Indexer = NewAdaptorsIndexer(as.Adaptors)
- default:
- ai.Indexer = discovery.NewCacheIndexer(as.Cache())
+ indexer := NewAdaptorsIndexer(as.Adaptors)
+ ai := &AggregatorIndexer{
+ CacheIndexer: discovery.NewCacheIndexer(as.Cache()),
+ AdaptorsIndexer: indexer,
+ LocalIndexer: indexer,
}
- ai.Registry = ai.Indexer
if registryIndex >= 0 {
- ai.Registry = as.Adaptors[registryIndex]
+ ai.LocalIndexer = as.Adaptors[registryIndex]
}
return ai
}
diff --git a/server/plugin/pkg/discovery/cacher.go b/server/plugin/pkg/discovery/cacher.go
index 9a9f884..086d928 100644
--- a/server/plugin/pkg/discovery/cacher.go
+++ b/server/plugin/pkg/discovery/cacher.go
@@ -40,7 +40,7 @@
default:
c.cache.Put(key, kv)
}
- c.OnEvent(KvEvent{Type: action, KV: kv})
+ c.OnEvent(KvEvent{Type: action, KV: kv, Revision: kv.ModRevision})
}
func (c *CommonCacher) OnEvent(evt KvEvent) {
diff --git a/server/plugin/pkg/discovery/etcd/indexer_cache.go b/server/plugin/pkg/discovery/etcd/indexer_cache.go
index 35e7ac6..09b371a 100644
--- a/server/plugin/pkg/discovery/etcd/indexer_cache.go
+++ b/server/plugin/pkg/discovery/etcd/indexer_cache.go
@@ -33,10 +33,7 @@
op := registry.OpGet(opts...)
key := util.BytesToStringWithNoCopy(op.Key)
- if i.Cache == nil ||
- op.Mode == registry.MODE_NO_CACHE ||
- op.Revision > 0 ||
- (op.Offset >= 0 && op.Limit > 0) {
+ if op.NoCache() {
return i.EtcdIndexer.Search(ctx, opts...)
}
@@ -49,7 +46,7 @@
return nil, err
}
- if resp.Count > 0 || op.Mode == registry.MODE_CACHE {
+ if resp.Count > 0 || op.CacheOnly() {
return resp, nil
}
diff --git a/server/plugin/pkg/discovery/etcd/indexer_etcd.go b/server/plugin/pkg/discovery/etcd/indexer_etcd.go
index 49e6d03..a38e9ba 100644
--- a/server/plugin/pkg/discovery/etcd/indexer_etcd.go
+++ b/server/plugin/pkg/discovery/etcd/indexer_etcd.go
@@ -69,7 +69,7 @@
kvs := make([]*discovery.KeyValue, 0, len(resp.Kvs))
for _, src := range resp.Kvs {
- kv := new(discovery.KeyValue)
+ kv := discovery.NewKeyValue()
if err = FromEtcdKeyValue(kv, src, p); err != nil {
continue
}
diff --git a/server/plugin/pkg/discovery/servicecenter/adaptor.go b/server/plugin/pkg/discovery/servicecenter/adaptor.go
index 04a8719..77e0086 100644
--- a/server/plugin/pkg/discovery/servicecenter/adaptor.go
+++ b/server/plugin/pkg/discovery/servicecenter/adaptor.go
@@ -48,13 +48,14 @@
func NewServiceCenterAdaptor(t discovery.Type, cfg *discovery.Config) *ServiceCenterAdaptor {
if t == backend.SCHEMA {
return &ServiceCenterAdaptor{
- Indexer: GetOrCreateClusterIndexer(),
+ Indexer: NewClusterIndexer(t, discovery.NullCache),
Cacher: discovery.NullCacher,
}
- }
- cache := discovery.NewKvCache(t.String(), cfg)
- return &ServiceCenterAdaptor{
- Indexer: discovery.NewCacheIndexer(cache),
- Cacher: BuildCacher(t, cfg, cache),
+ } else {
+ cache := discovery.NewKvCache(t.String(), cfg)
+ return &ServiceCenterAdaptor{
+ Indexer: NewClusterIndexer(t, cache),
+ Cacher: BuildCacher(t, cfg, cache),
+ }
}
}
diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate.go b/server/plugin/pkg/discovery/servicecenter/aggregate.go
index 1cb097b..bafa296 100644
--- a/server/plugin/pkg/discovery/servicecenter/aggregate.go
+++ b/server/plugin/pkg/discovery/servicecenter/aggregate.go
@@ -19,18 +19,26 @@
"crypto/tls"
"github.com/apache/servicecomb-service-center/pkg/client/sc"
"github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/admin/model"
- pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/core"
scerr "github.com/apache/servicecomb-service-center/server/error"
mgr "github.com/apache/servicecomb-service-center/server/plugin"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+ "golang.org/x/net/context"
"strings"
+ "sync"
+)
+
+var (
+ scClient *SCClientAggregate
+ clientOnce sync.Once
+ clientTLS *tls.Config
)
type SCClientAggregate []*sc.SCClient
-var clientTLS *tls.Config
-
func getClientTLS() (*tls.Config, error) {
if clientTLS != nil {
return clientTLS, nil
@@ -40,11 +48,11 @@
return clientTLS, err
}
-func (c *SCClientAggregate) GetScCache() (*model.Cache, map[string]error) {
+func (c *SCClientAggregate) GetScCache(ctx context.Context) (*model.Cache, map[string]error) {
var caches *model.Cache
errs := make(map[string]error)
for _, client := range *c {
- cache, err := client.GetScCache()
+ cache, err := client.GetScCache(ctx)
if err != nil {
errs[client.Cfg.Name] = err
continue
@@ -53,72 +61,148 @@
if caches == nil {
caches = &model.Cache{}
}
- caches.Microservices = append(caches.Microservices, cache.Microservices...)
- caches.Indexes = append(caches.Indexes, cache.Indexes...)
- caches.Aliases = append(caches.Aliases, cache.Aliases...)
- caches.Tags = append(caches.Tags, cache.Tags...)
- caches.Rules = append(caches.Rules, cache.Rules...)
- caches.RuleIndexes = append(caches.RuleIndexes, cache.RuleIndexes...)
- caches.DependencyRules = append(caches.DependencyRules, cache.DependencyRules...)
- caches.Summaries = append(caches.Summaries, cache.Summaries...)
- caches.Instances = append(caches.Instances, cache.Instances...)
+ c.cacheAppend(client.Cfg.Name, &caches.Microservices, &cache.Microservices)
+ c.cacheAppend(client.Cfg.Name, &caches.Indexes, &cache.Indexes)
+ c.cacheAppend(client.Cfg.Name, &caches.Aliases, &cache.Aliases)
+ c.cacheAppend(client.Cfg.Name, &caches.Tags, &cache.Tags)
+ c.cacheAppend(client.Cfg.Name, &caches.Rules, &cache.Rules)
+ c.cacheAppend(client.Cfg.Name, &caches.RuleIndexes, &cache.RuleIndexes)
+ c.cacheAppend(client.Cfg.Name, &caches.DependencyRules, &cache.DependencyRules)
+ c.cacheAppend(client.Cfg.Name, &caches.Summaries, &cache.Summaries)
+ c.cacheAppend(client.Cfg.Name, &caches.Instances, &cache.Instances)
}
return caches, errs
}
-func (c *SCClientAggregate) GetSchemasByServiceId(domainProject, serviceId string) ([]*pb.Schema, *scerr.Error) {
- var schemas []*pb.Schema
+func (c *SCClientAggregate) cacheAppend(name string, setter model.Setter, getter model.Getter) {
+ getter.ForEach(func(_ int, v *model.KV) bool {
+ v.ClusterName = name
+ setter.SetValue(v)
+ return true
+ })
+}
+
+func (c *SCClientAggregate) GetSchemasByServiceId(ctx context.Context, domainProject, serviceId string) (*discovery.Response, *scerr.Error) {
+ var response discovery.Response
for _, client := range *c {
- ss, err := client.GetSchemasByServiceId(domainProject, serviceId)
+ schemas, err := client.GetSchemasByServiceId(ctx, domainProject, serviceId)
if err != nil && err.InternalError() {
log.Errorf(err, "get schema by serviceId[%s/%s] failed", domainProject, serviceId)
continue
}
- schemas = append(schemas, ss...)
+ if schemas == nil {
+ continue
+ }
+ response.Count = int64(len(schemas))
+ for _, schema := range schemas {
+ response.Kvs = append(response.Kvs, &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)),
+ Value: util.StringToBytesWithNoCopy(schema.Schema),
+ ModRevision: 0,
+ ClusterName: client.Cfg.Name,
+ })
+ }
+ return &response, nil
}
-
- return schemas, nil
+ return &response, nil
}
-func (c *SCClientAggregate) GetSchemaBySchemaId(domainProject, serviceId, schemaId string) (schema *pb.Schema, err *scerr.Error) {
+func (c *SCClientAggregate) GetSchemaBySchemaId(ctx context.Context, domainProject, serviceId, schemaId string) (*discovery.Response, *scerr.Error) {
+ var response discovery.Response
for _, client := range *c {
- schema, err = client.GetSchemaBySchemaId(domainProject, serviceId, schemaId)
+ schema, err := client.GetSchemaBySchemaId(ctx, domainProject, serviceId, schemaId)
if err != nil && err.InternalError() {
log.Errorf(err, "get schema by serviceId[%s/%s] failed", domainProject, serviceId)
continue
}
- if schema != nil {
- return
+ if schema == nil {
+ continue
}
+ response.Count = 1
+ response.Kvs = append(response.Kvs, &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)),
+ Value: util.StringToBytesWithNoCopy(schema.Schema),
+ ModRevision: 0,
+ ClusterName: client.Cfg.Name,
+ })
+ return &response, nil
}
-
- return
+ return &response, nil
}
-func NewSCClientAggregate() *SCClientAggregate {
- c := &SCClientAggregate{}
- clusters := registry.Configuration().Clusters
- for name, endpoints := range clusters {
- if len(name) == 0 || name == registry.Configuration().ClusterName {
+func (c *SCClientAggregate) GetInstancesByServiceId(ctx context.Context, domainProject, providerId, consumerId string) (*discovery.Response, *scerr.Error) {
+ var response discovery.Response
+ for _, client := range *c {
+ insts, err := client.GetInstancesByServiceId(ctx, domainProject, providerId, consumerId)
+ if err != nil && err.InternalError() {
+ log.Errorf(err, "consumer[%s] get provider[%s/%s] instances failed", consumerId, domainProject, providerId)
continue
}
- client, err := sc.NewSCClient(sc.Config{Name: name, Endpoints: endpoints})
- if err != nil {
- log.Errorf(err, "new service center[%s]%v client failed", name, endpoints)
+ if insts == nil {
continue
}
- client.Timeout = registry.Configuration().RequestTimeOut
- // TLS
- if strings.Index(endpoints[0], "https") >= 0 {
- client.TLS, err = getClientTLS()
- if err != nil {
- log.Errorf(err, "get service center[%s]%v tls config failed", name, endpoints)
+ response.Count = int64(len(insts))
+ for _, instance := range insts {
+ response.Kvs = append(response.Kvs, &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey(domainProject, providerId, instance.InstanceId)),
+ Value: instance,
+ ModRevision: 0,
+ ClusterName: client.Cfg.Name,
+ })
+ }
+ }
+ return &response, nil
+}
+
+func (c *SCClientAggregate) GetInstanceByInstanceId(ctx context.Context, domainProject, providerId, instanceId, consumerId string) (*discovery.Response, *scerr.Error) {
+ var response discovery.Response
+ for _, client := range *c {
+ instance, err := client.GetInstanceByInstanceId(ctx, domainProject, providerId, instanceId, consumerId)
+ if err != nil && err.InternalError() {
+ log.Errorf(err, "consumer[%s] get provider[%s/%s] instances failed", consumerId, domainProject, providerId)
+ continue
+ }
+ if instance == nil {
+ continue
+ }
+ response.Count = 1
+ response.Kvs = append(response.Kvs, &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey(domainProject, providerId, instance.InstanceId)),
+ Value: instance,
+ ModRevision: 0,
+ ClusterName: client.Cfg.Name,
+ })
+ return &response, nil
+ }
+ return &response, nil
+}
+
+func GetOrCreateSCClient() *SCClientAggregate {
+ clientOnce.Do(func() {
+ scClient = &SCClientAggregate{}
+ clusters := registry.Configuration().Clusters
+ for name, endpoints := range clusters {
+ if len(name) == 0 || name == registry.Configuration().ClusterName {
continue
}
- }
+ client, err := sc.NewSCClient(sc.Config{Name: name, Endpoints: endpoints})
+ if err != nil {
+ log.Errorf(err, "new service center[%s]%v client failed", name, endpoints)
+ continue
+ }
+ client.Timeout = registry.Configuration().RequestTimeOut
+ // TLS
+ if strings.Index(endpoints[0], "https") >= 0 {
+ client.TLS, err = getClientTLS()
+ if err != nil {
+ log.Errorf(err, "get service center[%s]%v tls config failed", name, endpoints)
+ continue
+ }
+ }
- *c = append(*c, client)
- log.Infof("new service center[%s]%v client", name, endpoints)
- }
- return c
+ *scClient = append(*scClient, client)
+ log.Infof("new service center[%s]%v client", name, endpoints)
+ }
+ })
+ return scClient
}
diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate_test.go b/server/plugin/pkg/discovery/servicecenter/aggregate_test.go
index e906c2b..3cf6c5c 100644
--- a/server/plugin/pkg/discovery/servicecenter/aggregate_test.go
+++ b/server/plugin/pkg/discovery/servicecenter/aggregate_test.go
@@ -23,7 +23,7 @@
func TestNewSCClientAggregate(t *testing.T) {
registry.Configuration().ClusterAddresses = "sc-1=127.0.0.1:2379,127.0.0.2:2379"
registry.Configuration().InitClusters()
- c := NewSCClientAggregate()
+ c := GetOrCreateSCClient()
if len(*c) == 0 {
t.Fatalf("TestNewSCClientAggregate failed")
}
diff --git a/server/plugin/pkg/discovery/servicecenter/cacher.go b/server/plugin/pkg/discovery/servicecenter/cacher.go
index 0634700..73faae6 100644
--- a/server/plugin/pkg/discovery/servicecenter/cacher.go
+++ b/server/plugin/pkg/discovery/servicecenter/cacher.go
@@ -24,7 +24,7 @@
}
func (c *ServiceCenterCacher) Ready() <-chan struct{} {
- return GetOrCreateClusterIndexer().Ready()
+ return closedCh
}
func NewServiceCenterCacher(cfg *discovery.Config, cache discovery.Cache) *ServiceCenterCacher {
@@ -35,6 +35,6 @@
func BuildCacher(t discovery.Type, cfg *discovery.Config, cache discovery.Cache) discovery.Cacher {
cr := NewServiceCenterCacher(cfg, cache)
- GetOrCreateClusterIndexer().AddCacher(t, cr)
+ GetOrCreateSyncer().AddCacher(t, cr)
return cr
}
diff --git a/server/plugin/pkg/discovery/servicecenter/indexer.go b/server/plugin/pkg/discovery/servicecenter/indexer.go
index 746cfa3..5ac4768 100644
--- a/server/plugin/pkg/discovery/servicecenter/indexer.go
+++ b/server/plugin/pkg/discovery/servicecenter/indexer.go
@@ -16,262 +16,97 @@
package servicecenter
import (
- "fmt"
- "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/client/sc"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/apache/servicecomb-service-center/server/admin/model"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
- pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ scerr "github.com/apache/servicecomb-service-center/server/error"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
"golang.org/x/net/context"
- "strings"
- "sync"
- "time"
-)
-
-var (
- cluster *ClusterIndexer
- clusterOnce sync.Once
)
type ClusterIndexer struct {
+ *discovery.CacheIndexer
Client *SCClientAggregate
-
- cachers map[discovery.Type]*ServiceCenterCacher
+ Type discovery.Type
}
-func (c *ClusterIndexer) Initialize() {
- c.cachers = make(map[discovery.Type]*ServiceCenterCacher)
- c.Client = NewSCClientAggregate()
+func (i *ClusterIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (resp *discovery.Response, err error) {
+ op := registry.OpGet(opts...)
+
+ if op.NoCache() {
+ return i.search(ctx, opts...)
+ }
+
+ resp, err = i.CacheIndexer.Search(ctx, opts...)
+ if err != nil {
+ return
+ }
+
+ if resp.Count > 0 || op.CacheOnly() {
+ return resp, nil
+ }
+
+ return i.search(ctx, opts...)
}
-func (c *ClusterIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (r *discovery.Response, err error) {
+func (i *ClusterIndexer) search(ctx context.Context, opts ...registry.PluginOpOption) (r *discovery.Response, err error) {
op := registry.OpGet(opts...)
key := util.BytesToStringWithNoCopy(op.Key)
- switch {
- case strings.Index(key, core.GetServiceSchemaRootKey("")) == 0:
- domainProject, serviceId, schemaId := core.GetInfoFromSchemaKV(op.Key)
- var schemas []*pb.Schema
- if op.Prefix && len(schemaId) == 0 {
- schemas, err = c.Client.GetSchemasByServiceId(domainProject, serviceId)
- if err != nil {
- return nil, err
- }
- } else {
- schema, err := c.Client.GetSchemaBySchemaId(domainProject, serviceId, schemaId)
- if err != nil {
- return nil, err
- }
- schemas = append(schemas, schema)
- }
- var response discovery.Response
- response.Count = int64(len(schemas))
- if op.CountOnly {
- return &response, nil
- }
- for _, schema := range schemas {
- response.Kvs = append(response.Kvs, &discovery.KeyValue{
- Key: util.StringToBytesWithNoCopy(
- core.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)),
- Value: util.StringToBytesWithNoCopy(schema.Schema),
- })
- }
- return &response, nil
+
+ ctx = context.WithValue(ctx, sc.QueryGlobal, "0")
+ switch i.Type {
+ case backend.SCHEMA:
+ r, err = i.searchSchemas(ctx, op)
+ case backend.INSTANCE:
+ r, err = i.searchInstances(ctx, op)
default:
- return nil, fmt.Errorf("no implement")
+ return &discovery.Response{}, nil
}
+ log.Debugf("search '%s' match special options, request sc server, opts: %s", key, op)
+ return
}
-func (c *ClusterIndexer) Sync(ctx context.Context) error {
- cache, errs := c.Client.GetScCache()
- if cache == nil && len(errs) > 0 {
- err := fmt.Errorf("%v", errs)
- log.Errorf(err, "sync failed")
- return err
+func (i *ClusterIndexer) searchSchemas(ctx context.Context, op registry.PluginOp) (*discovery.Response, error) {
+ var (
+ resp *discovery.Response
+ scErr *scerr.Error
+ )
+ domainProject, serviceId, schemaId := core.GetInfoFromSchemaKV(op.Key)
+ if op.Prefix && len(schemaId) == 0 {
+ resp, scErr = i.Client.GetSchemasByServiceId(ctx, domainProject, serviceId)
+ } else {
+ resp, scErr = i.Client.GetSchemaBySchemaId(ctx, domainProject, serviceId, schemaId)
}
-
- // microservice
- serviceCacher, ok := c.cachers[backend.SERVICE]
- if ok {
- c.check(serviceCacher, &cache.Microservices, errs)
+ if scErr != nil {
+ return nil, scErr
}
- aliasCacher, ok := c.cachers[backend.SERVICE_ALIAS]
- if ok {
- c.checkWithConflictHandleFunc(aliasCacher, &cache.Aliases, errs, c.logConflictFunc)
- }
- indexCacher, ok := c.cachers[backend.SERVICE_INDEX]
- if ok {
- c.checkWithConflictHandleFunc(indexCacher, &cache.Indexes, errs, c.logConflictFunc)
- }
- // instance
- instCacher, ok := c.cachers[backend.INSTANCE]
- if ok {
- c.check(instCacher, &cache.Instances, errs)
- }
- // microservice meta
- tagCacher, ok := c.cachers[backend.SERVICE_TAG]
- if ok {
- c.check(tagCacher, &cache.Tags, errs)
- }
- ruleCacher, ok := c.cachers[backend.RULE]
- if ok {
- c.check(ruleCacher, &cache.Rules, errs)
- }
- ruleIndexCacher, ok := c.cachers[backend.RULE_INDEX]
- if ok {
- c.check(ruleIndexCacher, &cache.RuleIndexes, errs)
- }
- depRuleCacher, ok := c.cachers[backend.DEPENDENCY_RULE]
- if ok {
- c.check(depRuleCacher, &cache.DependencyRules, errs)
- }
- return nil
+ return resp, nil
}
-func (c *ClusterIndexer) check(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error) {
- c.checkWithConflictHandleFunc(local, remote, skipClusters, c.skipHandleFunc)
-}
-
-func (c *ClusterIndexer) checkWithConflictHandleFunc(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error,
- conflictHandleFunc func(origin *model.KV, conflict model.Getter, index int)) {
- exists := make(map[string]*model.KV)
- remote.ForEach(func(i int, v *model.KV) bool {
- // because the result of the remote return may contain the same data as
- // the local cache of the current SC. So we need to ignore it and
- // prevent the aggregation result from increasing.
- if v.ClusterName == registry.Configuration().ClusterName {
- return true
- }
- if kv, ok := exists[v.Key]; ok {
- conflictHandleFunc(kv, remote, i)
- return true
- }
- exists[v.Key] = v
- kv := local.Cache().Get(v.Key)
- newKv := &discovery.KeyValue{
- Key: util.StringToBytesWithNoCopy(v.Key),
- Value: v.Value,
- ModRevision: v.Rev,
- ClusterName: v.ClusterName,
- }
- switch {
- case kv == nil:
- newKv.Version = 1
- newKv.CreateRevision = v.Rev
- local.Notify(pb.EVT_CREATE, v.Key, newKv)
- case kv.ModRevision != v.Rev:
- // if connect to some cluster failed, then skip to notify changes
- // of these clusters to prevent publish the wrong changes events of kvs.
- if err, ok := skipClusters[kv.ClusterName]; ok {
- log.Errorf(err, "cluster[%s] temporarily unavailable, skip cluster[%s] event %s %s",
- kv.ClusterName, v.ClusterName, pb.EVT_UPDATE, v.Key)
- break
- }
- newKv.Version = kv.ModRevision - kv.ModRevision
- newKv.CreateRevision = kv.CreateRevision
- local.Notify(pb.EVT_UPDATE, v.Key, newKv)
- }
- return true
- })
-
- var deletes []*discovery.KeyValue
- local.Cache().ForEach(func(key string, v *discovery.KeyValue) (next bool) {
- var exist bool
- remote.ForEach(func(_ int, v *model.KV) bool {
- if v.ClusterName == registry.Configuration().ClusterName {
- return true
- }
- exist = v.Key == key
- return !exist
- })
- if !exist {
- if err, ok := skipClusters[v.ClusterName]; ok {
- log.Errorf(err, "cluster[%s] temporarily unavailable, skip event %s %s",
- v.ClusterName, pb.EVT_DELETE, v.Key)
- return true
- }
- deletes = append(deletes, v)
- }
- return true
- })
- for _, v := range deletes {
- local.Notify(pb.EVT_DELETE, util.BytesToStringWithNoCopy(v.Key), v)
+func (i *ClusterIndexer) searchInstances(ctx context.Context, op registry.PluginOp) (r *discovery.Response, err error) {
+ var (
+ resp *discovery.Response
+ scErr *scerr.Error
+ )
+ serviceId, instanceId, domainProject := core.GetInfoFromInstKV(op.Key)
+ if op.Prefix && len(instanceId) == 0 {
+ resp, scErr = i.Client.GetInstancesByServiceId(ctx, domainProject, serviceId, "")
+ } else {
+ resp, scErr = i.Client.GetInstanceByInstanceId(ctx, domainProject, serviceId, instanceId, "")
}
-}
-
-func (c *ClusterIndexer) skipHandleFunc(origin *model.KV, conflict model.Getter, index int) {
-}
-
-func (c *ClusterIndexer) logConflictFunc(origin *model.KV, conflict model.Getter, index int) {
- switch conflict.(type) {
- case *model.MicroserviceIndexSlice:
- slice := conflict.(*model.MicroserviceIndexSlice)
- kv := (*slice)[index]
- if serviceId := origin.Value.(string); kv.Value != serviceId {
- key := core.GetInfoFromSvcIndexKV(util.StringToBytesWithNoCopy(kv.Key))
- log.Warnf("conflict! can not merge microservice index[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
- kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version,
- serviceId, origin.ClusterName)
- }
- case *model.MicroserviceAliasSlice:
- slice := conflict.(*model.MicroserviceAliasSlice)
- kv := (*slice)[index]
- if serviceId := origin.Value.(string); kv.Value != serviceId {
- key := core.GetInfoFromSvcAliasKV(util.StringToBytesWithNoCopy(kv.Key))
- log.Warnf("conflict! can not merge microservice alias[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
- kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version,
- serviceId, origin.ClusterName)
- }
+ if scErr != nil {
+ return nil, scErr
}
+ return resp, nil
}
-func (c *ClusterIndexer) loop(ctx context.Context) {
- select {
- case <-ctx.Done():
- case <-time.After(minWaitInterval):
- c.Sync(ctx)
- d := registry.Configuration().AutoSyncInterval
- if d == 0 {
- return
- }
- loop:
- for {
- select {
- case <-ctx.Done():
- break loop
- case <-time.After(d):
- // TODO support watching sc
- c.Sync(ctx)
- }
- }
+func NewClusterIndexer(t discovery.Type, cache discovery.Cache) *ClusterIndexer {
+ return &ClusterIndexer{
+ CacheIndexer: discovery.NewCacheIndexer(cache),
+ Client: GetOrCreateSCClient(),
+ Type: t,
}
-
- log.Debug("service center client is stopped")
-}
-
-// unsafe
-func (c *ClusterIndexer) AddCacher(t discovery.Type, cacher *ServiceCenterCacher) {
- c.cachers[t] = cacher
-}
-
-func (c *ClusterIndexer) Run() {
- c.Initialize()
- gopool.Go(c.loop)
-}
-
-func (c *ClusterIndexer) Stop() {}
-
-func (c *ClusterIndexer) Ready() <-chan struct{} {
- return closedCh
-}
-
-func GetOrCreateClusterIndexer() *ClusterIndexer {
- clusterOnce.Do(func() {
- cluster = &ClusterIndexer{}
- cluster.Run()
- })
- return cluster
}
diff --git a/server/plugin/pkg/discovery/servicecenter/syncer.go b/server/plugin/pkg/discovery/servicecenter/syncer.go
new file mode 100644
index 0000000..c504c28
--- /dev/null
+++ b/server/plugin/pkg/discovery/servicecenter/syncer.go
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package servicecenter
+
+import (
+ "fmt"
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/admin/model"
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+ "golang.org/x/net/context"
+ "sync"
+ "time"
+)
+
+var (
+ syncer *Syncer
+ syncerOnce sync.Once
+)
+
+type Syncer struct {
+ Client *SCClientAggregate
+
+ cachers map[discovery.Type]*ServiceCenterCacher
+}
+
+func (c *Syncer) Initialize() {
+ c.cachers = make(map[discovery.Type]*ServiceCenterCacher)
+ c.Client = GetOrCreateSCClient()
+}
+
+func (c *Syncer) Sync(ctx context.Context) error {
+ cache, errs := c.Client.GetScCache(ctx)
+ if cache == nil && len(errs) > 0 {
+ err := fmt.Errorf("%v", errs)
+ log.Errorf(err, "sync failed")
+ return err
+ }
+
+ // microservice
+ serviceCacher, ok := c.cachers[backend.SERVICE]
+ if ok {
+ c.check(serviceCacher, &cache.Microservices, errs)
+ }
+ indexCacher, ok := c.cachers[backend.SERVICE_INDEX]
+ if ok {
+ c.checkWithConflictHandleFunc(indexCacher, &cache.Indexes, errs, c.logConflictFunc)
+ }
+ aliasCacher, ok := c.cachers[backend.SERVICE_ALIAS]
+ if ok {
+ c.checkWithConflictHandleFunc(aliasCacher, &cache.Aliases, errs, c.logConflictFunc)
+ }
+ // microservice meta
+ tagCacher, ok := c.cachers[backend.SERVICE_TAG]
+ if ok {
+ c.check(tagCacher, &cache.Tags, errs)
+ }
+ ruleCacher, ok := c.cachers[backend.RULE]
+ if ok {
+ c.check(ruleCacher, &cache.Rules, errs)
+ }
+ ruleIndexCacher, ok := c.cachers[backend.RULE_INDEX]
+ if ok {
+ c.check(ruleIndexCacher, &cache.RuleIndexes, errs)
+ }
+ depRuleCacher, ok := c.cachers[backend.DEPENDENCY_RULE]
+ if ok {
+ c.check(depRuleCacher, &cache.DependencyRules, errs)
+ }
+ schemaSummaryCacher, ok := c.cachers[backend.SCHEMA_SUMMARY]
+ if ok {
+ c.check(schemaSummaryCacher, &cache.Summaries, errs)
+ }
+ // instance
+ instCacher, ok := c.cachers[backend.INSTANCE]
+ if ok {
+ c.check(instCacher, &cache.Instances, errs)
+ }
+ return nil
+}
+
+func (c *Syncer) check(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error) {
+ c.checkWithConflictHandleFunc(local, remote, skipClusters, c.skipHandleFunc)
+}
+
+func (c *Syncer) checkWithConflictHandleFunc(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error,
+ conflictHandleFunc func(origin *model.KV, conflict model.Getter, index int)) {
+ exists := make(map[string]*model.KV)
+ remote.ForEach(func(i int, v *model.KV) bool {
+ // because the result of the remote return may contain the same data as
+ // the local cache of the current SC. So we need to ignore it and
+ // prevent the aggregation result from increasing.
+ if v.ClusterName == registry.Configuration().ClusterName {
+ return true
+ }
+ if kv, ok := exists[v.Key]; ok {
+ conflictHandleFunc(kv, remote, i)
+ return true
+ }
+ exists[v.Key] = v
+ old := local.Cache().Get(v.Key)
+ newKv := &discovery.KeyValue{
+ Key: util.StringToBytesWithNoCopy(v.Key),
+ Value: v.Value,
+ ModRevision: v.Rev,
+ ClusterName: v.ClusterName,
+ }
+ switch {
+ case old == nil:
+ newKv.Version = 1
+ newKv.CreateRevision = v.Rev
+ local.Notify(pb.EVT_CREATE, v.Key, newKv)
+ case old.ModRevision != v.Rev:
+ // if connect to some cluster failed, then skip to notify changes
+ // of these clusters to prevent publish the wrong changes events of kvs.
+ if err, ok := skipClusters[old.ClusterName]; ok {
+ log.Errorf(err, "cluster[%s] temporarily unavailable, skip cluster[%s] event %s %s",
+ old.ClusterName, v.ClusterName, pb.EVT_UPDATE, v.Key)
+ break
+ }
+ newKv.Version = 1 + old.Version
+ newKv.CreateRevision = old.CreateRevision
+ local.Notify(pb.EVT_UPDATE, v.Key, newKv)
+ }
+ return true
+ })
+
+ var deletes []*discovery.KeyValue
+ local.Cache().ForEach(func(key string, v *discovery.KeyValue) (next bool) {
+ var exist bool
+ remote.ForEach(func(_ int, v *model.KV) bool {
+ if v.ClusterName == registry.Configuration().ClusterName {
+ return true
+ }
+ exist = v.Key == key
+ return !exist
+ })
+ if !exist {
+ if err, ok := skipClusters[v.ClusterName]; ok {
+ log.Errorf(err, "cluster[%s] temporarily unavailable, skip event %s %s",
+ v.ClusterName, pb.EVT_DELETE, v.Key)
+ return true
+ }
+ deletes = append(deletes, v)
+ }
+ return true
+ })
+ for _, v := range deletes {
+ local.Notify(pb.EVT_DELETE, util.BytesToStringWithNoCopy(v.Key), v)
+ }
+}
+
+func (c *Syncer) skipHandleFunc(origin *model.KV, conflict model.Getter, index int) {
+}
+
+func (c *Syncer) logConflictFunc(origin *model.KV, conflict model.Getter, index int) {
+ switch conflict.(type) {
+ case *model.MicroserviceIndexSlice:
+ slice := conflict.(*model.MicroserviceIndexSlice)
+ kv := (*slice)[index]
+ if serviceId := origin.Value.(string); kv.Value != serviceId {
+ key := core.GetInfoFromSvcIndexKV(util.StringToBytesWithNoCopy(kv.Key))
+ log.Warnf("conflict! can not merge microservice index[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
+ kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version,
+ serviceId, origin.ClusterName)
+ }
+ case *model.MicroserviceAliasSlice:
+ slice := conflict.(*model.MicroserviceAliasSlice)
+ kv := (*slice)[index]
+ if serviceId := origin.Value.(string); kv.Value != serviceId {
+ key := core.GetInfoFromSvcAliasKV(util.StringToBytesWithNoCopy(kv.Key))
+ log.Warnf("conflict! can not merge microservice alias[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
+ kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version,
+ serviceId, origin.ClusterName)
+ }
+ }
+}
+
+func (c *Syncer) loop(ctx context.Context) {
+ select {
+ case <-ctx.Done():
+ case <-time.After(minWaitInterval):
+ c.Sync(ctx)
+ d := registry.Configuration().AutoSyncInterval
+ if d == 0 {
+ return
+ }
+ loop:
+ for {
+ select {
+ case <-ctx.Done():
+ break loop
+ case <-time.After(d):
+ // TODO support watching sc
+ c.Sync(ctx)
+ }
+ }
+ }
+
+ log.Debug("service center clusters syncer is stopped")
+}
+
+// unsafe
+func (c *Syncer) AddCacher(t discovery.Type, cacher *ServiceCenterCacher) {
+ c.cachers[t] = cacher
+}
+
+func (c *Syncer) Run() {
+ c.Initialize()
+ gopool.Go(c.loop)
+}
+
+func GetOrCreateSyncer() *Syncer {
+ syncerOnce.Do(func() {
+ syncer = &Syncer{}
+ syncer.Run()
+ })
+ return syncer
+}
diff --git a/server/plugin/pkg/discovery/servicecenter/indexer_test.go b/server/plugin/pkg/discovery/servicecenter/syncer_test.go
similarity index 81%
rename from server/plugin/pkg/discovery/servicecenter/indexer_test.go
rename to server/plugin/pkg/discovery/servicecenter/syncer_test.go
index 6d62217..f465832 100644
--- a/server/plugin/pkg/discovery/servicecenter/indexer_test.go
+++ b/server/plugin/pkg/discovery/servicecenter/syncer_test.go
@@ -26,7 +26,7 @@
)
func TestClusterIndexer_Sync(t *testing.T) {
- indexer := &ClusterIndexer{}
+ syncer := &Syncer{}
cache := discovery.NewKvCache("test", discovery.Configure())
cfg := discovery.Configure()
sccacher := NewServiceCenterCacher(cfg, cache)
@@ -36,7 +36,7 @@
cfg.WithEventFunc(func(discovery.KvEvent) {
t.Fatalf("TestClusterIndexer_Sync failed")
})
- indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
t.Fatalf("TestClusterIndexer_Sync failed")
})
@@ -49,7 +49,7 @@
})
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "a", Rev: 1, ClusterName: "a"})
- indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
t.Fatalf("TestClusterIndexer_Sync failed")
})
@@ -62,7 +62,7 @@
})
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"})
- indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) {
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
})
@@ -70,7 +70,7 @@
cfg.WithEventFunc(func(evt discovery.KvEvent) {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
})
- indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
t.Fatalf("TestClusterIndexer_Sync failed")
})
@@ -81,7 +81,7 @@
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"})
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "b"})
- indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, indexer.logConflictFunc)
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, syncer.logConflictFunc)
// case: conflict and print log
func() {
@@ -92,7 +92,7 @@
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"})
arr.SetValue(&model.KV{Key: "/a", Value: "ab", Rev: 2, ClusterName: "b"})
- indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, indexer.logConflictFunc)
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, syncer.logConflictFunc)
// '/a' is incorrect key and logConflictFunc will be excepted to panic here
t.Fatalf("TestClusterIndexer_Sync failed")
}()
@@ -103,7 +103,7 @@
})
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "ab", Rev: 3, ClusterName: "b"})
- indexer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) {
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
})
@@ -112,7 +112,7 @@
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
})
arr = model.MicroserviceIndexSlice{}
- indexer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) {
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
})
@@ -124,7 +124,7 @@
}
})
arr = model.MicroserviceIndexSlice{}
- indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) {
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
})
@@ -137,7 +137,7 @@
})
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "a", Rev: 1, ClusterName: registry.Configuration().ClusterName})
- indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
t.Fatalf("TestClusterIndexer_Sync failed")
})
@@ -151,7 +151,7 @@
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "x", Rev: 2, ClusterName: registry.Configuration().ClusterName})
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"})
- indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) {
+ syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
})
}
diff --git a/server/plugin/pkg/registry/option.go b/server/plugin/pkg/registry/option.go
index f81a22c..b6be514 100644
--- a/server/plugin/pkg/registry/option.go
+++ b/server/plugin/pkg/registry/option.go
@@ -37,7 +37,7 @@
WatchCallback WatchCallback
Offset int64
Limit int64
- RegistryOnly bool
+ Global bool
}
func (op PluginOp) String() string {
@@ -88,12 +88,22 @@
if op.Limit > 0 {
buf.WriteString(fmt.Sprintf("&limit=%d", op.Limit))
}
- if op.RegistryOnly {
- buf.WriteString("®istryOnly=true")
+ if op.Global {
+ buf.WriteString("&global=true")
}
return buf.String()
}
+func (op PluginOp) NoCache() bool {
+ return op.Mode == MODE_NO_CACHE ||
+ op.Revision > 0 ||
+ (op.Offset >= 0 && op.Limit > 0)
+}
+
+func (op PluginOp) CacheOnly() bool {
+ return op.Mode == MODE_CACHE
+}
+
type Operation func(...PluginOpOption) (op PluginOp)
type PluginOpOption func(*PluginOp)
@@ -111,7 +121,7 @@
func WithLease(leaseID int64) PluginOpOption { return func(op *PluginOp) { op.Lease = leaseID } }
func WithKeyOnly() PluginOpOption { return func(op *PluginOp) { op.KeyOnly = true } }
func WithCountOnly() PluginOpOption { return func(op *PluginOp) { op.CountOnly = true } }
-func WithRegistryOnly() PluginOpOption { return func(op *PluginOp) { op.RegistryOnly = true } }
+func WithGlobal() PluginOpOption { return func(op *PluginOp) { op.Global = true } }
func WithNoneOrder() PluginOpOption { return func(op *PluginOp) { op.SortOrder = SORT_NONE } }
func WithAscendOrder() PluginOpOption { return func(op *PluginOp) { op.SortOrder = SORT_ASCEND } }
func WithDescendOrder() PluginOpOption { return func(op *PluginOp) { op.SortOrder = SORT_DESCEND } }
diff --git a/server/service/cache/common.go b/server/service/cache/common.go
index b17da58..b25c92d 100644
--- a/server/service/cache/common.go
+++ b/server/service/cache/common.go
@@ -17,10 +17,11 @@
package cache
const (
- CTX_FIND_CONSUMER = "consumer"
- CTX_FIND_PROVIDER = "provider"
- CTX_FIND_TAGS = "tags"
+ CTX_FIND_CONSUMER = "_consumer"
+ CTX_FIND_PROVIDER = "_provider"
+ CTX_FIND_TAGS = "_tags"
+ CTX_FIND_REQUEST_REV = "_rev"
- CACHE_FIND = "find"
- CACHE_DEP = "dep"
+ CACHE_FIND = "_find"
+ CACHE_DEP = "_dep"
)
diff --git a/server/service/cache/filter_consumer.go b/server/service/cache/filter_consumer.go
index 387c998..3691cb5 100644
--- a/server/service/cache/filter_consumer.go
+++ b/server/service/cache/filter_consumer.go
@@ -24,7 +24,7 @@
type ConsumerFilter struct {
}
-func (f *ConsumerFilter) Name(ctx context.Context) string {
+func (f *ConsumerFilter) Name(ctx context.Context, _ *cache.Node) string {
return ctx.Value(CTX_FIND_CONSUMER).(string)
}
diff --git a/server/service/cache/filter_instances.go b/server/service/cache/filter_instances.go
index c981ab0..0f26750 100644
--- a/server/service/cache/filter_instances.go
+++ b/server/service/cache/filter_instances.go
@@ -26,24 +26,49 @@
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
"golang.org/x/net/context"
+ "sort"
)
+var clustersIndex = make(map[string]int)
+
+func init() {
+ var clusters []string
+ for name := range registry.Configuration().Clusters {
+ clusters = append(clusters, name)
+ }
+ sort.Strings(clusters)
+ for i, name := range clusters {
+ clustersIndex[name] = i
+ }
+}
+
type InstancesFilter struct {
}
-func (f *InstancesFilter) Name(ctx context.Context) string {
+func (f *InstancesFilter) Name(ctx context.Context, _ *cache.Node) string {
return ""
}
func (f *InstancesFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) {
- provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey)
pCopy := *parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
+ pCopy.Instances, pCopy.Rev, err = f.FindInstances(ctx, pCopy.ServiceIds)
+ if err != nil {
+ return
+ }
+ pCopy.InitBrokenQueue()
+ node = cache.NewNode()
+ node.Cache.Set(CACHE_FIND, &pCopy)
+ return
+}
+
+func (f *InstancesFilter) FindInstances(ctx context.Context, serviceIds []string) (instances []*pb.MicroServiceInstance, rev string, err error) {
+ provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey)
var (
- instances []*pb.MicroServiceInstance
- rev int64
+ maxRevs = make([]int64, len(clustersIndex))
+ counts = make([]int64, len(clustersIndex))
)
- for _, providerServiceId := range pCopy.ServiceIds {
+ for _, providerServiceId := range serviceIds {
key := apt.GenerateInstanceKey(provider.Tenant, providerServiceId, "")
opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key), registry.WithPrefix())
resp, err := backend.Store().Instance().Search(ctx, opts...)
@@ -52,21 +77,20 @@
findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumer.ServiceId,
provider.AppId, provider.ServiceName, provider.Version)
log.Errorf(err, "Instance().Search failed, %s", findFlag)
- return nil, err
+ return nil, "", err
}
for _, kv := range resp.Kvs {
- if kv.ModRevision > rev {
- rev = kv.ModRevision
+ if i, ok := clustersIndex[kv.ClusterName]; ok {
+ if kv.ModRevision > maxRevs[i] {
+ maxRevs[i] = kv.ModRevision
+ }
+ counts[i]++
}
instances = append(instances, kv.Value.(*pb.MicroServiceInstance))
}
+
}
- pCopy.Instances = instances
- pCopy.Rev = serviceUtil.FormatRevision(rev, int64(len(instances)))
-
- node = cache.NewNode()
- node.Cache.Set(CACHE_FIND, &pCopy)
- return
+ return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
}
diff --git a/server/service/cache/filter_permission.go b/server/service/cache/filter_permission.go
index 2a806a5..f2726b1 100644
--- a/server/service/cache/filter_permission.go
+++ b/server/service/cache/filter_permission.go
@@ -28,14 +28,14 @@
type AccessibleFilter struct {
}
-func (f *AccessibleFilter) Name(ctx context.Context) string {
+func (f *AccessibleFilter) Name(ctx context.Context, _ *cache.Node) string {
consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService)
return consumer.ServiceId
}
func (f *AccessibleFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) {
var ids []string
- consumerId := f.Name(ctx)
+ consumerId := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService).ServiceId
pCopy := *parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
for _, providerServiceId := range pCopy.ServiceIds {
if err := serviceUtil.Accessible(ctx, consumerId, providerServiceId); err != nil {
diff --git a/server/service/cache/filter_rev.go b/server/service/cache/filter_rev.go
new file mode 100644
index 0000000..d3eb5a4
--- /dev/null
+++ b/server/service/cache/filter_rev.go
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cache
+
+import (
+ "github.com/apache/servicecomb-service-center/pkg/cache"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
+ "golang.org/x/net/context"
+)
+
+type RevisionFilter struct {
+ InstancesFilter
+}
+
+func (f *RevisionFilter) Name(ctx context.Context, parent *cache.Node) string {
+ item := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
+ requestRev := ctx.Value(CTX_FIND_REQUEST_REV).(string)
+ if len(requestRev) == 0 || requestRev == item.Rev {
+ return ""
+ }
+ return requestRev
+}
+
+func (f *RevisionFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) {
+ item := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
+ requestRev := ctx.Value(CTX_FIND_REQUEST_REV).(string)
+ if len(requestRev) == 0 || requestRev == item.Rev {
+ node = cache.NewNode()
+ node.Cache.Set(CACHE_FIND, item)
+ return
+ }
+
+ if item.BrokenWait() {
+ node = cache.NewNode()
+ node.Cache.Set(CACHE_FIND, item)
+ return
+ }
+
+ cloneCtx := util.CloneContext(ctx)
+ cloneCtx = util.SetContext(cloneCtx, serviceUtil.CTX_NOCACHE, "1")
+
+ insts, _, err := f.FindInstances(cloneCtx, item.ServiceIds)
+ if err != nil {
+ item.InitBrokenQueue()
+ return nil, err
+ }
+
+ log.Warnf("the cache of finding instances api is broken, req[%s]!=cache[%s]",
+ requestRev, item.Rev)
+ item.Instances = insts
+ item.Broken()
+
+ node = cache.NewNode()
+ node.Cache.Set(CACHE_FIND, item)
+ return
+}
diff --git a/server/service/cache/filter_service.go b/server/service/cache/filter_service.go
index 7cb0bba..08fbc4e 100644
--- a/server/service/cache/filter_service.go
+++ b/server/service/cache/filter_service.go
@@ -26,7 +26,7 @@
type ServiceFilter struct {
}
-func (f *ServiceFilter) Name(ctx context.Context) string {
+func (f *ServiceFilter) Name(ctx context.Context, _ *cache.Node) string {
provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey)
return util.StringJoin([]string{
provider.Tenant,
diff --git a/server/service/cache/filter_tags.go b/server/service/cache/filter_tags.go
index e5eb995..41b1981 100644
--- a/server/service/cache/filter_tags.go
+++ b/server/service/cache/filter_tags.go
@@ -31,7 +31,7 @@
type TagsFilter struct {
}
-func (f *TagsFilter) Name(ctx context.Context) string {
+func (f *TagsFilter) Name(ctx context.Context, _ *cache.Node) string {
tags, _ := ctx.Value(CTX_FIND_TAGS).([]string)
sort.Strings(tags)
return strings.Join(tags, ",")
diff --git a/server/service/cache/filter_version.go b/server/service/cache/filter_version.go
index d4fbcab..0c47f6c 100644
--- a/server/service/cache/filter_version.go
+++ b/server/service/cache/filter_version.go
@@ -28,7 +28,7 @@
type VersionRuleFilter struct {
}
-func (f *VersionRuleFilter) Name(ctx context.Context) string {
+func (f *VersionRuleFilter) Name(ctx context.Context, _ *cache.Node) string {
provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey)
return provider.Version
}
diff --git a/server/service/cache/instance.go b/server/service/cache/instance.go
index a0be59f..f7e7f30 100644
--- a/server/service/cache/instance.go
+++ b/server/service/cache/instance.go
@@ -36,7 +36,9 @@
&VersionRuleFilter{},
&TagsFilter{},
&AccessibleFilter{},
- &InstancesFilter{})
+ &InstancesFilter{},
+ &RevisionFilter{},
+ )
}
type VersionRuleCacheItem struct {
@@ -44,17 +46,40 @@
ServiceIds []string
Instances []*pb.MicroServiceInstance
Rev string
+
+ broken bool
+ queue chan struct{}
+}
+
+func (vi *VersionRuleCacheItem) InitBrokenQueue() {
+ if vi.queue == nil {
+ vi.queue = make(chan struct{}, 1)
+ }
+ vi.broken = false
+ vi.queue <- struct{}{}
+}
+
+func (vi *VersionRuleCacheItem) BrokenWait() bool {
+ <-vi.queue
+ return vi.broken
+}
+
+func (vi *VersionRuleCacheItem) Broken() {
+ vi.broken = true
+ close(vi.queue)
}
type FindInstancesCache struct {
*cache.Tree
}
-func (f *FindInstancesCache) Get(ctx context.Context, consumer *pb.MicroService, provider *pb.MicroServiceKey, tags []string) (*VersionRuleCacheItem, error) {
- cloneCtx := context.WithValue(context.WithValue(context.WithValue(ctx,
+func (f *FindInstancesCache) Get(ctx context.Context, consumer *pb.MicroService, provider *pb.MicroServiceKey,
+ tags []string, rev string) (*VersionRuleCacheItem, error) {
+ cloneCtx := context.WithValue(context.WithValue(context.WithValue(context.WithValue(ctx,
CTX_FIND_CONSUMER, consumer),
CTX_FIND_PROVIDER, provider),
- CTX_FIND_TAGS, tags)
+ CTX_FIND_TAGS, tags),
+ CTX_FIND_REQUEST_REV, rev)
node, err := f.Tree.Get(cloneCtx, cache.Options().Temporary(ctx.Value(serviceUtil.CTX_NOCACHE) == "1"))
if node == nil {
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index df0f23f..6dc50f7 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -155,7 +155,7 @@
}
func (h *DependencyEventHandler) dependencyRuleHandle(res interface{}) error {
- ctx := context.Background()
+ ctx := context.WithValue(context.Background(), serviceUtil.CTX_GLOBAL, "1")
dependencyEventHandlerRes := res.(*DependencyEventHandlerResource)
r := dependencyEventHandlerRes.dep
consumerFlag := util.StringJoin([]string{r.Consumer.Environment, r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/")
@@ -204,7 +204,8 @@
func (h *DependencyEventHandler) CleanUp(domainProjects map[string]struct{}) {
for domainProject := range domainProjects {
- if err := serviceUtil.CleanUpDependencyRules(context.Background(), domainProject); err != nil {
+ ctx := context.WithValue(context.Background(), serviceUtil.CTX_GLOBAL, "1")
+ if err := serviceUtil.CleanUpDependencyRules(ctx, domainProject); err != nil {
log.Errorf(err, "clean up '%s' dependency rules failed", domainProject)
}
}
diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go
index 6f2594d..d526d74 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -66,7 +66,9 @@
}
// 查询服务版本信息
- ctx := util.SetContext(context.Background(), serviceUtil.CTX_CACHEONLY, "1")
+ ctx := context.WithValue(context.WithValue(context.Background(),
+ serviceUtil.CTX_CACHEONLY, "1"),
+ serviceUtil.CTX_GLOBAL, "1")
ms, err := serviceUtil.GetService(ctx, domainProject, providerId)
if ms == nil {
log.Errorf(err, "caught [%s] instance[%s/%s] event, get cached provider's file failed",
diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go
index 7ec6864..095dd0e 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -20,7 +20,6 @@
"fmt"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/task"
- "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
@@ -53,7 +52,9 @@
}
func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, providerId string, rev int64) error {
- ctx = util.SetContext(ctx, serviceUtil.CTX_CACHEONLY, "1")
+ ctx = context.WithValue(context.WithValue(ctx,
+ serviceUtil.CTX_CACHEONLY, "1"),
+ serviceUtil.CTX_GLOBAL, "1")
provider, err := serviceUtil.GetService(ctx, domainProject, providerId)
if err != nil {
diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go
index 132a8e5..fb7ea03 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -20,7 +20,6 @@
"fmt"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/task"
- "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
@@ -54,7 +53,9 @@
}
func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumerId string, rev int64) error {
- ctx = util.SetContext(ctx, serviceUtil.CTX_CACHEONLY, "1")
+ ctx = context.WithValue(context.WithValue(ctx,
+ serviceUtil.CTX_CACHEONLY, "1"),
+ serviceUtil.CTX_GLOBAL, "1")
consumer, err := serviceUtil.GetService(ctx, domainProject, consumerId)
if err != nil {
diff --git a/server/service/instance.go b/server/service/instance.go
index ad12ee3..667d675 100644
--- a/server/service/instance.go
+++ b/server/service/instance.go
@@ -566,32 +566,20 @@
// cache
var item *cache.VersionRuleCacheItem
- noCache, cacheOnly := ctx.Value(serviceUtil.CTX_NOCACHE) == "1", ctx.Value(serviceUtil.CTX_CACHEONLY) == "1"
rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(string)
- reqRev, _ := serviceUtil.ParseRevision(rev)
- cloneCtx := util.CloneContext(ctx)
-
- for i := 0; i < 2; i++ {
- item, err = cache.FindInstances.Get(cloneCtx, service, provider, in.Tags)
- if err != nil {
- log.Errorf(err, "FindInstancesCache.Get failed, %s failed", findFlag())
- return &pb.FindInstancesResponse{
- Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
- }, err
- }
- if item == nil {
- mes := fmt.Errorf("%s failed, provider does not exist.", findFlag())
- log.Errorf(mes, "FindInstancesCache.Get failed")
- return &pb.FindInstancesResponse{
- Response: pb.CreateResponse(scerr.ErrServiceNotExists, mes.Error()),
- }, nil
- }
-
- cacheRev, _ := serviceUtil.ParseRevision(item.Rev)
- if noCache || cacheOnly || reqRev <= cacheRev {
- break
- }
- cloneCtx = util.SetContext(cloneCtx, serviceUtil.CTX_NOCACHE, "1")
+ item, err = cache.FindInstances.Get(ctx, service, provider, in.Tags, rev)
+ if err != nil {
+ log.Errorf(err, "FindInstancesCache.Get failed, %s failed", findFlag())
+ return &pb.FindInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
+ }, err
+ }
+ if item == nil {
+ mes := fmt.Errorf("%s failed, provider does not exist.", findFlag())
+ log.Errorf(mes, "FindInstancesCache.Get failed")
+ return &pb.FindInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrServiceNotExists, mes.Error()),
+ }, nil
}
// add dependency queue
@@ -618,8 +606,9 @@
instances := item.Instances
if rev == item.Rev {
- instances = instances[:0]
+ instances = nil // for gRPC
}
+ // TODO support gRPC output context
ctx = util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, item.Rev)
return &pb.FindInstancesResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."),
diff --git a/server/service/instance_test.go b/server/service/instance_test.go
index 3e6e3cf..d40dd63 100644
--- a/server/service/instance_test.go
+++ b/server/service/instance_test.go
@@ -1226,12 +1226,10 @@
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
rev, _ := ctx.Value(serviceUtil.CTX_RESPONSE_REVISION).(string)
- reqRev, reqCount := serviceUtil.ParseRevision(rev)
- Expect(int64(len(respFind.Instances))).To(Equal(reqCount))
Expect(respFind.Instances[0].InstanceId).To(Equal(instanceId8))
- Expect(reqRev).NotTo(Equal(0))
+ Expect(len(rev)).NotTo(Equal(0))
- util.SetContext(ctx, serviceUtil.CTX_REQUEST_REVISION, strconv.FormatInt(reqRev-1, 10))
+ util.SetContext(ctx, serviceUtil.CTX_REQUEST_REVISION, "x")
respFind, err = instanceResource.Find(ctx, &pb.FindInstancesRequest{
ConsumerServiceId: serviceId8,
AppId: "query_instance",
@@ -1240,20 +1238,6 @@
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
- Expect(int64(len(respFind.Instances))).To(Equal(reqCount))
- Expect(respFind.Instances[0].InstanceId).To(Equal(instanceId8))
- Expect(ctx.Value(serviceUtil.CTX_RESPONSE_REVISION)).To(Equal(rev))
-
- util.SetContext(ctx, serviceUtil.CTX_REQUEST_REVISION, strconv.FormatInt(reqRev+1, 10))
- respFind, err = instanceResource.Find(ctx, &pb.FindInstancesRequest{
- ConsumerServiceId: serviceId8,
- AppId: "query_instance",
- ServiceName: "query_instance_with_rev",
- VersionRule: "1.0.0",
- })
- Expect(err).To(BeNil())
- Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
- Expect(int64(len(respFind.Instances))).To(Equal(reqCount))
Expect(respFind.Instances[0].InstanceId).To(Equal(instanceId8))
Expect(ctx.Value(serviceUtil.CTX_RESPONSE_REVISION)).To(Equal(rev))
diff --git a/server/service/util/common.go b/server/service/util/common.go
index 0b182aa..88c5737 100644
--- a/server/service/util/common.go
+++ b/server/service/util/common.go
@@ -18,9 +18,9 @@
const (
HEADER_REV = "X-Resource-Revision"
- CTX_REGISTRYONLY = "_registryOnly"
- CTX_NOCACHE = "_noCache"
- CTX_CACHEONLY = "_cacheOnly"
- CTX_REQUEST_REVISION = "_requestRev"
- CTX_RESPONSE_REVISION = "_responseRev"
+ CTX_GLOBAL = "global"
+ CTX_NOCACHE = "noCache"
+ CTX_CACHEONLY = "cacheOnly"
+ CTX_REQUEST_REVISION = "requestRev"
+ CTX_RESPONSE_REVISION = "responseRev"
)
diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go
index cae6760..302347f 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -17,6 +17,7 @@
package util
import (
+ "crypto/sha1"
"encoding/json"
"fmt"
"github.com/apache/servicecomb-service-center/pkg/log"
@@ -62,18 +63,11 @@
return resp.Kvs[0].Value.(*pb.MicroServiceInstance), nil
}
-func ParseRevision(rev string) (int64, int64) {
- arrRev := strings.Split(rev, ".")
- reqRev, _ := strconv.ParseInt(arrRev[0], 10, 64)
- reqCount := int64(0)
- if len(arrRev) > 1 {
- reqCount, _ = strconv.ParseInt(arrRev[1], 10, 64)
+func FormatRevision(revs, counts []int64) (s string) {
+ for i, rev := range revs {
+ s += fmt.Sprintf("%d.%d,", rev, counts[i])
}
- return reqRev, reqCount
-}
-
-func FormatRevision(rev, count int64) string {
- return fmt.Sprintf("%d.%d", rev, count)
+ return fmt.Sprintf("%x", sha1.Sum(util.StringToBytesWithNoCopy(s)))
}
func GetAllInstancesOfOneService(ctx context.Context, domainProject string, serviceId string) ([]*pb.MicroServiceInstance, error) {
diff --git a/server/service/util/instance_util_test.go b/server/service/util/instance_util_test.go
index 52e1403..abe5182 100644
--- a/server/service/util/instance_util_test.go
+++ b/server/service/util/instance_util_test.go
@@ -25,11 +25,17 @@
)
func TestFormatRevision(t *testing.T) {
- if "1.1" != FormatRevision(1, 1) {
- t.Fatalf("TestFormatRevision failed")
+ // null
+ if x := FormatRevision(nil, nil); "da39a3ee5e6b4b0d3255bfef95601890afd80709" != x {
+ t.Fatalf("TestFormatRevision failed, %s", x)
}
- if a, b := ParseRevision("1.1"); a != 1 || b != 1 {
- t.Fatalf("TestFormatRevision failed")
+ // 1.1,11.1,
+ if x := FormatRevision([]int64{1, 11}, []int64{1, 1}); "87aa7d310290ff4f93248c0aed6870b928edf45a" != x {
+ t.Fatalf("TestFormatRevision failed, %s", x)
+ }
+ // 1.11,1.1,
+ if x := FormatRevision([]int64{1, 1}, []int64{11, 1}); "24675d196e3dea5be0c774cab281366640fc99ef" != x {
+ t.Fatalf("TestFormatRevision failed, %s", x)
}
}
diff --git a/server/service/util/util.go b/server/service/util/util.go
index 2184b29..677a559 100644
--- a/server/service/util/util.go
+++ b/server/service/util/util.go
@@ -29,8 +29,8 @@
case ctx.Value(CTX_CACHEONLY) == "1":
opts = append(opts, registry.WithCacheOnly())
}
- if ctx.Value(CTX_REGISTRYONLY) == "1" {
- opts = append(opts, registry.WithRegistryOnly())
+ if ctx.Value(CTX_GLOBAL) == "1" {
+ opts = append(opts, registry.WithGlobal())
}
return opts
}