blob: 746cfa34350f5f5dd51c429b94599e082736203f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package servicecenter
import (
"fmt"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/admin/model"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
"golang.org/x/net/context"
"strings"
"sync"
"time"
)
var (
cluster *ClusterIndexer
clusterOnce sync.Once
)
type ClusterIndexer struct {
Client *SCClientAggregate
cachers map[discovery.Type]*ServiceCenterCacher
}
func (c *ClusterIndexer) Initialize() {
c.cachers = make(map[discovery.Type]*ServiceCenterCacher)
c.Client = NewSCClientAggregate()
}
func (c *ClusterIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (r *discovery.Response, err error) {
op := registry.OpGet(opts...)
key := util.BytesToStringWithNoCopy(op.Key)
switch {
case strings.Index(key, core.GetServiceSchemaRootKey("")) == 0:
domainProject, serviceId, schemaId := core.GetInfoFromSchemaKV(op.Key)
var schemas []*pb.Schema
if op.Prefix && len(schemaId) == 0 {
schemas, err = c.Client.GetSchemasByServiceId(domainProject, serviceId)
if err != nil {
return nil, err
}
} else {
schema, err := c.Client.GetSchemaBySchemaId(domainProject, serviceId, schemaId)
if err != nil {
return nil, err
}
schemas = append(schemas, schema)
}
var response discovery.Response
response.Count = int64(len(schemas))
if op.CountOnly {
return &response, nil
}
for _, schema := range schemas {
response.Kvs = append(response.Kvs, &discovery.KeyValue{
Key: util.StringToBytesWithNoCopy(
core.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)),
Value: util.StringToBytesWithNoCopy(schema.Schema),
})
}
return &response, nil
default:
return nil, fmt.Errorf("no implement")
}
}
func (c *ClusterIndexer) Sync(ctx context.Context) error {
cache, errs := c.Client.GetScCache()
if cache == nil && len(errs) > 0 {
err := fmt.Errorf("%v", errs)
log.Errorf(err, "sync failed")
return err
}
// microservice
serviceCacher, ok := c.cachers[backend.SERVICE]
if ok {
c.check(serviceCacher, &cache.Microservices, errs)
}
aliasCacher, ok := c.cachers[backend.SERVICE_ALIAS]
if ok {
c.checkWithConflictHandleFunc(aliasCacher, &cache.Aliases, errs, c.logConflictFunc)
}
indexCacher, ok := c.cachers[backend.SERVICE_INDEX]
if ok {
c.checkWithConflictHandleFunc(indexCacher, &cache.Indexes, errs, c.logConflictFunc)
}
// instance
instCacher, ok := c.cachers[backend.INSTANCE]
if ok {
c.check(instCacher, &cache.Instances, errs)
}
// microservice meta
tagCacher, ok := c.cachers[backend.SERVICE_TAG]
if ok {
c.check(tagCacher, &cache.Tags, errs)
}
ruleCacher, ok := c.cachers[backend.RULE]
if ok {
c.check(ruleCacher, &cache.Rules, errs)
}
ruleIndexCacher, ok := c.cachers[backend.RULE_INDEX]
if ok {
c.check(ruleIndexCacher, &cache.RuleIndexes, errs)
}
depRuleCacher, ok := c.cachers[backend.DEPENDENCY_RULE]
if ok {
c.check(depRuleCacher, &cache.DependencyRules, errs)
}
return nil
}
func (c *ClusterIndexer) check(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error) {
c.checkWithConflictHandleFunc(local, remote, skipClusters, c.skipHandleFunc)
}
func (c *ClusterIndexer) checkWithConflictHandleFunc(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error,
conflictHandleFunc func(origin *model.KV, conflict model.Getter, index int)) {
exists := make(map[string]*model.KV)
remote.ForEach(func(i int, v *model.KV) bool {
// because the result of the remote return may contain the same data as
// the local cache of the current SC. So we need to ignore it and
// prevent the aggregation result from increasing.
if v.ClusterName == registry.Configuration().ClusterName {
return true
}
if kv, ok := exists[v.Key]; ok {
conflictHandleFunc(kv, remote, i)
return true
}
exists[v.Key] = v
kv := local.Cache().Get(v.Key)
newKv := &discovery.KeyValue{
Key: util.StringToBytesWithNoCopy(v.Key),
Value: v.Value,
ModRevision: v.Rev,
ClusterName: v.ClusterName,
}
switch {
case kv == nil:
newKv.Version = 1
newKv.CreateRevision = v.Rev
local.Notify(pb.EVT_CREATE, v.Key, newKv)
case kv.ModRevision != v.Rev:
// if connect to some cluster failed, then skip to notify changes
// of these clusters to prevent publish the wrong changes events of kvs.
if err, ok := skipClusters[kv.ClusterName]; ok {
log.Errorf(err, "cluster[%s] temporarily unavailable, skip cluster[%s] event %s %s",
kv.ClusterName, v.ClusterName, pb.EVT_UPDATE, v.Key)
break
}
newKv.Version = kv.ModRevision - kv.ModRevision
newKv.CreateRevision = kv.CreateRevision
local.Notify(pb.EVT_UPDATE, v.Key, newKv)
}
return true
})
var deletes []*discovery.KeyValue
local.Cache().ForEach(func(key string, v *discovery.KeyValue) (next bool) {
var exist bool
remote.ForEach(func(_ int, v *model.KV) bool {
if v.ClusterName == registry.Configuration().ClusterName {
return true
}
exist = v.Key == key
return !exist
})
if !exist {
if err, ok := skipClusters[v.ClusterName]; ok {
log.Errorf(err, "cluster[%s] temporarily unavailable, skip event %s %s",
v.ClusterName, pb.EVT_DELETE, v.Key)
return true
}
deletes = append(deletes, v)
}
return true
})
for _, v := range deletes {
local.Notify(pb.EVT_DELETE, util.BytesToStringWithNoCopy(v.Key), v)
}
}
func (c *ClusterIndexer) skipHandleFunc(origin *model.KV, conflict model.Getter, index int) {
}
func (c *ClusterIndexer) logConflictFunc(origin *model.KV, conflict model.Getter, index int) {
switch conflict.(type) {
case *model.MicroserviceIndexSlice:
slice := conflict.(*model.MicroserviceIndexSlice)
kv := (*slice)[index]
if serviceId := origin.Value.(string); kv.Value != serviceId {
key := core.GetInfoFromSvcIndexKV(util.StringToBytesWithNoCopy(kv.Key))
log.Warnf("conflict! can not merge microservice index[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version,
serviceId, origin.ClusterName)
}
case *model.MicroserviceAliasSlice:
slice := conflict.(*model.MicroserviceAliasSlice)
kv := (*slice)[index]
if serviceId := origin.Value.(string); kv.Value != serviceId {
key := core.GetInfoFromSvcAliasKV(util.StringToBytesWithNoCopy(kv.Key))
log.Warnf("conflict! can not merge microservice alias[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version,
serviceId, origin.ClusterName)
}
}
}
func (c *ClusterIndexer) loop(ctx context.Context) {
select {
case <-ctx.Done():
case <-time.After(minWaitInterval):
c.Sync(ctx)
d := registry.Configuration().AutoSyncInterval
if d == 0 {
return
}
loop:
for {
select {
case <-ctx.Done():
break loop
case <-time.After(d):
// TODO support watching sc
c.Sync(ctx)
}
}
}
log.Debug("service center client is stopped")
}
// unsafe
func (c *ClusterIndexer) AddCacher(t discovery.Type, cacher *ServiceCenterCacher) {
c.cachers[t] = cacher
}
func (c *ClusterIndexer) Run() {
c.Initialize()
gopool.Go(c.loop)
}
func (c *ClusterIndexer) Stop() {}
func (c *ClusterIndexer) Ready() <-chan struct{} {
return closedCh
}
func GetOrCreateClusterIndexer() *ClusterIndexer {
clusterOnce.Do(func() {
cluster = &ClusterIndexer{}
cluster.Run()
})
return cluster
}