blob: aa0a65479549fdf955d2525523e7fda5d181ef20 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package aggregate
import (
"fmt"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/state"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
"github.com/apache/servicecomb-service-center/pkg/log"
)
// Aggregator implements state.State.
// Aggregator is an aggregator of multi Adaptors, and it aggregates all the
// Adaptors' data as it's result.
type Aggregator struct {
// Indexer searches data from all the adapters
kvstore.Indexer
Type kvstore.Type
Adaptors []state.State
}
// Cache gets all the adapters' cache
func (as *Aggregator) Cache() kvstore.CacheReader {
var cache Cache
for _, a := range as.Adaptors {
cache = append(cache, a.Cache())
}
return cache
}
func (as *Aggregator) Run() {
for _, a := range as.Adaptors {
a.Run()
}
}
func (as *Aggregator) Stop() {
for _, a := range as.Adaptors {
a.Stop()
}
}
func (as *Aggregator) Ready() <-chan struct{} {
for _, a := range as.Adaptors {
<-a.Ready()
}
return closedCh
}
func getLogConflictFunc(t kvstore.Type) func(origin, conflict *kvstore.KeyValue) {
switch t {
case sd.TypeServiceIndex:
return func(origin, conflict *kvstore.KeyValue) {
if serviceID, conflictID := origin.Value.(string), conflict.Value.(string); conflictID != serviceID {
key := path.GetInfoFromSvcIndexKV(conflict.Key)
log.Warn(fmt.Sprintf("conflict! can not merge microservice index[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
conflict.ClusterName, conflictID, key.Environment, key.AppId, key.ServiceName, key.Version,
serviceID, origin.ClusterName))
}
}
case sd.TypeServiceAlias:
return func(origin, conflict *kvstore.KeyValue) {
if serviceID, conflictID := origin.Value.(string), conflict.Value.(string); conflictID != serviceID {
key := path.GetInfoFromSvcAliasKV(conflict.Key)
log.Warn(fmt.Sprintf("conflict! can not merge microservice alias[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
conflict.ClusterName, conflictID, key.Environment, key.AppId, key.ServiceName, key.Version,
serviceID, origin.ClusterName))
}
}
}
return nil
}
func NewAggregator(t kvstore.Type, cfg *kvstore.Options) *Aggregator {
as := &Aggregator{Type: t}
for _, name := range repos {
// create and get all plugin instances
repo, err := state.NewRepository(state.Config{Kind: name})
if err != nil {
log.Error(fmt.Sprintf("failed to new plugin instance[%s]", name), err)
continue
}
as.Adaptors = append(as.Adaptors, repo.New(t, cfg))
}
as.Indexer = NewAggregatorIndexer(as)
switch t {
case sd.TypeServiceIndex, sd.TypeServiceAlias:
NewConflictChecker(as.Cache(), getLogConflictFunc(t))
}
return as
}