blob: a6d2cdf49b0935b2402e03208d1ff5f702e1cc52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package directory
import (
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/configurator"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/protocolwrapper"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)
// Options ...
type Options struct {
serviceTTL time.Duration
}
// Option ...
type Option func(*Options)
type registryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
listenerLock sync.Mutex
serviceType string
registry registry.Registry
cacheInvokersMap *sync.Map //use sync.map
cacheOriginUrl *common.URL
configurators []config_center.Configurator
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
Options
serviceKey string
forbidden atomic.Bool
}
// NewRegistryDirectory ...
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
options := Options{
//default 300s
serviceTTL: time.Duration(300e9),
}
for _, opt := range opts {
opt(&options)
}
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
dir := &registryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
serviceType: url.SubURL.Service(),
registry: registry,
Options: options,
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
return dir, nil
}
//subscribe from registry
func (dir *registryDirectory) Subscribe(url *common.URL) {
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
dir.registry.Subscribe(url, dir)
}
func (dir *registryDirectory) Notify(event *registry.ServiceEvent) {
go dir.update(event)
}
//subscribe service from registry, and update the cacheServices
func (dir *registryDirectory) update(res *registry.ServiceEvent) {
if res == nil {
return
}
logger.Debugf("registry update, result{%s}", res)
logger.Debugf("update service name: %s!", res.Service)
dir.refreshInvokers(res)
}
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var (
url *common.URL
oldInvoker protocol.Invoker = nil
)
//judge is override or others
if res != nil {
url = &res.Service
//1.for override url in 2.6.x
if url.Protocol == constant.OVERRIDE_PROTOCOL ||
url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url))
url = nil
} else if url.Protocol == constant.ROUTER_PROTOCOL || //2.for router
url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
url = nil
}
switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
logger.Infof("selector add service url{%s}", res.Service)
var urls []*common.URL
for _, v := range directory.GetRouterURLSet().Values() {
urls = append(urls, v.(*common.URL))
}
if len(urls) > 0 {
dir.SetRouters(urls)
}
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel:
oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
}
}
newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
dir.cacheInvokers = newInvokers
dir.listenerLock.Unlock()
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
if oldInvoker != nil {
oldInvoker.Destroy()
}
}
func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
newInvokersList := []protocol.Invoker{}
groupInvokersMap := make(map[string][]protocol.Invoker)
groupInvokersList := []protocol.Invoker{}
dir.cacheInvokersMap.Range(func(key, value interface{}) bool {
newInvokersList = append(newInvokersList, value.(protocol.Invoker))
return true
})
for _, invoker := range newInvokersList {
group := invoker.GetUrl().GetParam(constant.GROUP_KEY, "")
if _, ok := groupInvokersMap[group]; ok {
groupInvokersMap[group] = append(groupInvokersMap[group], invoker)
} else {
groupInvokersMap[group] = []protocol.Invoker{invoker}
}
}
if len(groupInvokersMap) == 1 {
//len is 1 it means no group setting ,so do not need cluster again
for _, invokers := range groupInvokersMap {
groupInvokersList = invokers
}
} else {
for _, invokers := range groupInvokersMap {
staticDir := directory.NewStaticDirectory(invokers)
cluster := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
staticDir.BuildRouterChain(invokers)
groupInvokersList = append(groupInvokersList, cluster.Join(staticDir))
}
}
return groupInvokersList
}
// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
return cacheInvoker.(protocol.Invoker)
}
return nil
}
// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL
if url == nil && dir.cacheOriginUrl != nil {
url = dir.cacheOriginUrl
} else {
dir.cacheOriginUrl = url
}
if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
return nil
}
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
newUrl := common.MergeUrl(url, referenceUrl)
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
return cacheInvoker.(protocol.Invoker)
}
}
}
return nil
}
//select the protocol invokers from the directory
func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
invokers := dir.cacheInvokers
routerChain := dir.RouterChain()
if routerChain == nil {
return invokers
}
return routerChain.Route(invokers, dir.cacheOriginUrl, invocation)
}
func (dir *registryDirectory) IsAvailable() bool {
if !dir.BaseDirectory.IsAvailable() {
return dir.BaseDirectory.IsAvailable()
}
for _, ivk := range dir.cacheInvokers {
if ivk.IsAvailable() {
return true
}
}
return false
}
func (dir *registryDirectory) Destroy() {
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
invokers := dir.cacheInvokers
dir.cacheInvokers = []protocol.Invoker{}
for _, ivk := range invokers {
ivk.Destroy()
}
})
}
func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) {
doOverrideUrl(dir.configurators, targetUrl)
doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl)
}
func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common.URL) {
for _, v := range configurators {
v.Configure(targetUrl)
}
}
type referenceConfigurationListener struct {
registry.BaseConfigurationListener
directory *registryDirectory
url *common.URL
}
func newReferenceConfigurationListener(dir *registryDirectory, url *common.URL) *referenceConfigurationListener {
listener := &referenceConfigurationListener{directory: dir, url: url}
listener.InitWith(
url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
listener,
extension.GetDefaultConfiguratorFunc(),
)
return listener
}
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
l.directory.refreshInvokers(nil)
}
type consumerConfigurationListener struct {
registry.BaseConfigurationListener
listeners []registry.NotifyListener
directory *registryDirectory
}
func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener {
listener := &consumerConfigurationListener{directory: dir}
listener.InitWith(
config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
listener,
extension.GetDefaultConfiguratorFunc(),
)
return listener
}
func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) {
l.listeners = append(l.listeners, listener)
}
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
l.directory.refreshInvokers(nil)
}