blob: f7b5acbbce30ada6cc1c76c578ba9ae1ee2b4f51 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package servicecenter
import (
"context"
"crypto/tls"
"github.com/apache/servicecomb-service-center/pkg/client/sc"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/admin/model"
"github.com/apache/servicecomb-service-center/server/core"
scerr "github.com/apache/servicecomb-service-center/server/error"
mgr "github.com/apache/servicecomb-service-center/server/plugin"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
"strings"
"sync"
)
var (
scClient *SCClientAggregate
clientOnce sync.Once
clientTLS *tls.Config
)
type SCClientAggregate []*sc.SCClient
func getClientTLS() (*tls.Config, error) {
if clientTLS != nil {
return clientTLS, nil
}
var err error
clientTLS, err = mgr.Plugins().TLS().ClientConfig()
return clientTLS, err
}
func (c *SCClientAggregate) GetScCache(ctx context.Context) (*model.Cache, map[string]error) {
var caches *model.Cache
errs := make(map[string]error)
for _, client := range *c {
cache, err := client.GetScCache(ctx)
if err != nil {
errs[client.Cfg.Name] = err
continue
}
if caches == nil {
caches = &model.Cache{}
}
c.cacheAppend(client.Cfg.Name, &caches.Microservices, &cache.Microservices)
c.cacheAppend(client.Cfg.Name, &caches.Indexes, &cache.Indexes)
c.cacheAppend(client.Cfg.Name, &caches.Aliases, &cache.Aliases)
c.cacheAppend(client.Cfg.Name, &caches.Tags, &cache.Tags)
c.cacheAppend(client.Cfg.Name, &caches.Rules, &cache.Rules)
c.cacheAppend(client.Cfg.Name, &caches.RuleIndexes, &cache.RuleIndexes)
c.cacheAppend(client.Cfg.Name, &caches.DependencyRules, &cache.DependencyRules)
c.cacheAppend(client.Cfg.Name, &caches.Summaries, &cache.Summaries)
c.cacheAppend(client.Cfg.Name, &caches.Instances, &cache.Instances)
}
return caches, errs
}
func (c *SCClientAggregate) cacheAppend(name string, setter model.Setter, getter model.Getter) {
getter.ForEach(func(_ int, v *model.KV) bool {
if len(v.ClusterName) == 0 || v.ClusterName == registry.DefaultClusterName {
v.ClusterName = name
}
setter.SetValue(v)
return true
})
}
func (c *SCClientAggregate) GetSchemasByServiceId(ctx context.Context, domainProject, serviceId string) (*discovery.Response, *scerr.Error) {
var response discovery.Response
for _, client := range *c {
schemas, err := client.GetSchemasByServiceId(ctx, domainProject, serviceId)
if err != nil && err.InternalError() {
log.Errorf(err, "get schema by serviceId[%s/%s] failed", domainProject, serviceId)
continue
}
if schemas == nil {
continue
}
response.Count = int64(len(schemas))
for _, schema := range schemas {
response.Kvs = append(response.Kvs, &discovery.KeyValue{
Key: []byte(core.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)),
Value: util.StringToBytesWithNoCopy(schema.Schema),
ModRevision: 0,
ClusterName: client.Cfg.Name,
})
}
return &response, nil
}
return &response, nil
}
func (c *SCClientAggregate) GetSchemaBySchemaId(ctx context.Context, domainProject, serviceId, schemaId string) (*discovery.Response, *scerr.Error) {
var response discovery.Response
for _, client := range *c {
schema, err := client.GetSchemaBySchemaId(ctx, domainProject, serviceId, schemaId)
if err != nil && err.InternalError() {
log.Errorf(err, "get schema by serviceId[%s/%s] failed", domainProject, serviceId)
continue
}
if schema == nil {
continue
}
response.Count = 1
response.Kvs = append(response.Kvs, &discovery.KeyValue{
Key: []byte(core.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)),
Value: util.StringToBytesWithNoCopy(schema.Schema),
ModRevision: 0,
ClusterName: client.Cfg.Name,
})
return &response, nil
}
return &response, nil
}
func (c *SCClientAggregate) GetInstancesByServiceId(ctx context.Context, domainProject, providerId, consumerId string) (*discovery.Response, *scerr.Error) {
var response discovery.Response
for _, client := range *c {
insts, err := client.GetInstancesByServiceId(ctx, domainProject, providerId, consumerId)
if err != nil && err.InternalError() {
log.Errorf(err, "consumer[%s] get provider[%s/%s] instances failed", consumerId, domainProject, providerId)
continue
}
if insts == nil {
continue
}
response.Count = int64(len(insts))
for _, instance := range insts {
response.Kvs = append(response.Kvs, &discovery.KeyValue{
Key: []byte(core.GenerateInstanceKey(domainProject, providerId, instance.InstanceId)),
Value: instance,
ModRevision: 0,
ClusterName: client.Cfg.Name,
})
}
}
return &response, nil
}
func (c *SCClientAggregate) GetInstanceByInstanceId(ctx context.Context, domainProject, providerId, instanceId, consumerId string) (*discovery.Response, *scerr.Error) {
var response discovery.Response
for _, client := range *c {
instance, err := client.GetInstanceByInstanceId(ctx, domainProject, providerId, instanceId, consumerId)
if err != nil && err.InternalError() {
log.Errorf(err, "consumer[%s] get provider[%s/%s] instances failed", consumerId, domainProject, providerId)
continue
}
if instance == nil {
continue
}
response.Count = 1
response.Kvs = append(response.Kvs, &discovery.KeyValue{
Key: []byte(core.GenerateInstanceKey(domainProject, providerId, instance.InstanceId)),
Value: instance,
ModRevision: 0,
ClusterName: client.Cfg.Name,
})
return &response, nil
}
return &response, nil
}
func GetOrCreateSCClient() *SCClientAggregate {
clientOnce.Do(func() {
scClient = &SCClientAggregate{}
clusters := registry.Configuration().Clusters
for name, endpoints := range clusters {
if len(name) == 0 || name == registry.Configuration().ClusterName {
continue
}
client, err := sc.NewSCClient(sc.Config{Name: name, Endpoints: endpoints})
if err != nil {
log.Errorf(err, "new service center[%s]%v client failed", name, endpoints)
continue
}
client.Timeout = registry.Configuration().RequestTimeOut
// TLS
if strings.Index(endpoints[0], "https") >= 0 {
client.TLS, err = getClientTLS()
if err != nil {
log.Errorf(err, "get service center[%s]%v tls config failed", name, endpoints)
continue
}
}
*scClient = append(*scClient, client)
log.Infof("new service center[%s]%v client", name, endpoints)
}
})
return scClient
}