blob: 8871a2a26e406145679e2911256ffb633fbb7f8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package directory
import (
"fmt"
"sync"
)
import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/configurator"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/protocolwrapper"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)
func init() {
extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
}
// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
listenerLock sync.Mutex
serviceType string
registry registry.Registry
cacheInvokersMap *sync.Map // use sync.map
cacheOriginUrl *common.URL
configurators []config_center.Configurator
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
serviceKey string
forbidden atomic.Bool
}
// NewRegistryDirectory will create a new RegistryDirectory
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
dir := &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
serviceType: url.SubURL.Service(),
registry: registry,
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
go dir.subscribe(url.SubURL)
return dir, nil
}
// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
dir.registry.Subscribe(url, dir)
}
// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
go dir.refreshInvokers(events...)
}
// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
// since in batch mode, the register center handles the different type of events by itself, then notify the directory
// a batch of 'Update' events, instead of omit the different type of event one by one.
func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
var oldInvokers []protocol.Invoker
// in batch mode, it is safe to remove since we have the complete list of events.
if len(events) > 1 {
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
}
for _, event := range events {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
if len(events) > 0 {
dir.setNewInvokers()
}
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
for _, invoker := range oldInvokers {
invoker.Destroy()
}
}
// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
for _, event := range events {
if dir.invokerCacheKey(&event.Service) == key {
return true
}
}
return false
}
// invokerCacheKey generates the key in the cache for a given URL.
func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string {
referenceUrl := dir.GetDirectoryUrl().SubURL
newUrl := common.MergeUrl(url, referenceUrl)
return newUrl.Key()
}
// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
func (dir *RegistryDirectory) setNewInvokers() {
newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
dir.RouterChain().SetInvokers(newInvokers)
}
// cacheInvokerByEvent caches invokers from the service event
func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
// judge is override or others
if event != nil {
u := dir.convertUrl(event)
switch event.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
logger.Infof("selector add service url{%s}", event.Service)
// FIXME: routers are built in every address notification?
dir.configRouters()
return dir.cacheInvoker(u), nil
case remoting.EventTypeDel:
logger.Infof("selector delete service url{%s}", event.Service)
return dir.uncacheInvoker(u), nil
default:
return nil, fmt.Errorf("illegal event type: %v", event.Action)
}
}
return nil, nil
}
// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above.
func (dir *RegistryDirectory) configRouters() {
var urls []*common.URL
for _, v := range config.GetRouterURLSet().Values() {
urls = append(urls, v.(*common.URL))
}
if len(urls) > 0 {
dir.SetRouters(urls)
}
}
// convertUrl processes override:// and router://
func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL {
ret := &res.Service
if ret.Protocol == constant.OVERRIDE_PROTOCOL || // 1.for override url in 2.6.x
ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret))
ret = nil
} else if ret.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
ret = nil
}
return ret
}
func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
var (
err error
newInvokersList []protocol.Invoker
)
groupInvokersMap := make(map[string][]protocol.Invoker)
dir.cacheInvokersMap.Range(func(key, value interface{}) bool {
newInvokersList = append(newInvokersList, value.(protocol.Invoker))
return true
})
for _, invoker := range newInvokersList {
group := invoker.GetUrl().GetParam(constant.GROUP_KEY, "")
if _, ok := groupInvokersMap[group]; ok {
groupInvokersMap[group] = append(groupInvokersMap[group], invoker)
} else {
groupInvokersMap[group] = []protocol.Invoker{invoker}
}
}
groupInvokersList := make([]protocol.Invoker, 0, len(groupInvokersMap))
if len(groupInvokersMap) == 1 {
// len is 1 it means no group setting ,so do not need cluster again
for _, invokers := range groupInvokersMap {
groupInvokersList = invokers
}
} else {
for _, invokers := range groupInvokersMap {
staticDir := directory.NewStaticDirectory(invokers)
cst := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
err = staticDir.BuildRouterChain(invokers)
if err != nil {
logger.Error(err)
continue
}
groupInvokersList = append(groupInvokersList, cst.Join(staticDir))
}
}
return groupInvokersList
}
// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
return dir.uncacheInvokerWithKey(url.Key())
}
func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", key)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); ok {
dir.cacheInvokersMap.Delete(key)
return cacheInvoker.(protocol.Invoker)
}
return nil
}
// cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL
if url == nil && dir.cacheOriginUrl != nil {
url = dir.cacheOriginUrl
} else {
dir.cacheOriginUrl = url
}
if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
return nil
}
// check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
newUrl := common.MergeUrl(url, referenceUrl)
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.IsEquals(*newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil
}
logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
return cacheInvoker.(protocol.Invoker)
}
}
}
return nil
}
// List selected protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
invokers := dir.cacheInvokers
routerChain := dir.RouterChain()
if routerChain == nil {
return invokers
}
return routerChain.Route(invokers, dir.cacheOriginUrl, invocation)
}
// IsAvailable whether the directory is available
func (dir *RegistryDirectory) IsAvailable() bool {
if !dir.BaseDirectory.IsAvailable() {
return dir.BaseDirectory.IsAvailable()
}
for _, ivk := range dir.cacheInvokers {
if ivk.IsAvailable() {
return true
}
}
return false
}
// Destroy method
func (dir *RegistryDirectory) Destroy() {
// TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
invokers := dir.cacheInvokers
dir.cacheInvokers = []protocol.Invoker{}
for _, ivk := range invokers {
ivk.Destroy()
}
})
}
func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) {
doOverrideUrl(dir.configurators, targetUrl)
doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl)
}
func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common.URL) {
for _, v := range configurators {
v.Configure(targetUrl)
}
}
type referenceConfigurationListener struct {
registry.BaseConfigurationListener
directory *RegistryDirectory
url *common.URL
}
func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener {
listener := &referenceConfigurationListener{directory: dir, url: url}
listener.InitWith(
url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
listener,
extension.GetDefaultConfiguratorFunc(),
)
return listener
}
// Process handle events and update Invokers
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
}
type consumerConfigurationListener struct {
registry.BaseConfigurationListener
listeners []registry.NotifyListener
directory *RegistryDirectory
}
func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigurationListener {
listener := &consumerConfigurationListener{directory: dir}
listener.InitWith(
config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
listener,
extension.GetDefaultConfiguratorFunc(),
)
return listener
}
func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) {
l.listeners = append(l.listeners, listener)
}
// Process handles events from Configuration Center and update Invokers
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
}