| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package directory |
| |
| import ( |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| perrors "github.com/pkg/errors" |
| "go.uber.org/atomic" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go/cluster/directory" |
| "github.com/apache/dubbo-go/common" |
| "github.com/apache/dubbo-go/common/constant" |
| "github.com/apache/dubbo-go/common/extension" |
| "github.com/apache/dubbo-go/common/logger" |
| "github.com/apache/dubbo-go/config" |
| "github.com/apache/dubbo-go/config_center" |
| _ "github.com/apache/dubbo-go/config_center/configurator" |
| "github.com/apache/dubbo-go/protocol" |
| "github.com/apache/dubbo-go/protocol/protocolwrapper" |
| "github.com/apache/dubbo-go/registry" |
| "github.com/apache/dubbo-go/remoting" |
| ) |
| |
| // Options ... |
| type Options struct { |
| serviceTTL time.Duration |
| } |
| |
| // Option ... |
| type Option func(*Options) |
| |
| type registryDirectory struct { |
| directory.BaseDirectory |
| cacheInvokers []protocol.Invoker |
| listenerLock sync.Mutex |
| serviceType string |
| registry registry.Registry |
| cacheInvokersMap *sync.Map //use sync.map |
| cacheOriginUrl *common.URL |
| configurators []config_center.Configurator |
| consumerConfigurationListener *consumerConfigurationListener |
| referenceConfigurationListener *referenceConfigurationListener |
| Options |
| serviceKey string |
| forbidden atomic.Bool |
| } |
| |
| // NewRegistryDirectory ... |
| func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) { |
| options := Options{ |
| //default 300s |
| serviceTTL: time.Duration(300e9), |
| } |
| for _, opt := range opts { |
| opt(&options) |
| } |
| if url.SubURL == nil { |
| return nil, perrors.Errorf("url is invalid, suburl can not be nil") |
| } |
| dir := ®istryDirectory{ |
| BaseDirectory: directory.NewBaseDirectory(url), |
| cacheInvokers: []protocol.Invoker{}, |
| cacheInvokersMap: &sync.Map{}, |
| serviceType: url.SubURL.Service(), |
| registry: registry, |
| Options: options, |
| } |
| dir.consumerConfigurationListener = newConsumerConfigurationListener(dir) |
| return dir, nil |
| } |
| |
| //subscribe from registry |
| func (dir *registryDirectory) Subscribe(url *common.URL) { |
| dir.consumerConfigurationListener.addNotifyListener(dir) |
| dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) |
| dir.registry.Subscribe(url, dir) |
| } |
| |
| func (dir *registryDirectory) Notify(event *registry.ServiceEvent) { |
| go dir.update(event) |
| } |
| |
| //subscribe service from registry, and update the cacheServices |
| func (dir *registryDirectory) update(res *registry.ServiceEvent) { |
| if res == nil { |
| return |
| } |
| logger.Debugf("registry update, result{%s}", res) |
| logger.Debugf("update service name: %s!", res.Service) |
| dir.refreshInvokers(res) |
| } |
| |
| func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { |
| var ( |
| url *common.URL |
| oldInvoker protocol.Invoker = nil |
| ) |
| //judge is override or others |
| if res != nil { |
| url = &res.Service |
| //1.for override url in 2.6.x |
| if url.Protocol == constant.OVERRIDE_PROTOCOL || |
| url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY { |
| dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url)) |
| url = nil |
| } else if url.Protocol == constant.ROUTER_PROTOCOL || //2.for router |
| url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY { |
| url = nil |
| } |
| switch res.Action { |
| case remoting.EventTypeAdd, remoting.EventTypeUpdate: |
| logger.Infof("selector add service url{%s}", res.Service) |
| var urls []*common.URL |
| |
| for _, v := range directory.GetRouterURLSet().Values() { |
| urls = append(urls, v.(*common.URL)) |
| } |
| |
| if len(urls) > 0 { |
| dir.SetRouters(urls) |
| } |
| |
| //dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL) |
| oldInvoker = dir.cacheInvoker(url) |
| case remoting.EventTypeDel: |
| oldInvoker = dir.uncacheInvoker(url) |
| logger.Infof("selector delete service url{%s}", res.Service) |
| default: |
| return |
| } |
| } |
| |
| newInvokers := dir.toGroupInvokers() |
| dir.listenerLock.Lock() |
| dir.cacheInvokers = newInvokers |
| dir.listenerLock.Unlock() |
| // After dir.cacheInvokers is updated,destroy the oldInvoker |
| // Ensure that no request will enter the oldInvoker |
| if oldInvoker != nil { |
| oldInvoker.Destroy() |
| } |
| |
| } |
| |
| func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { |
| newInvokersList := []protocol.Invoker{} |
| groupInvokersMap := make(map[string][]protocol.Invoker) |
| groupInvokersList := []protocol.Invoker{} |
| |
| dir.cacheInvokersMap.Range(func(key, value interface{}) bool { |
| newInvokersList = append(newInvokersList, value.(protocol.Invoker)) |
| return true |
| }) |
| |
| for _, invoker := range newInvokersList { |
| group := invoker.GetUrl().GetParam(constant.GROUP_KEY, "") |
| |
| if _, ok := groupInvokersMap[group]; ok { |
| groupInvokersMap[group] = append(groupInvokersMap[group], invoker) |
| } else { |
| groupInvokersMap[group] = []protocol.Invoker{invoker} |
| } |
| } |
| if len(groupInvokersMap) == 1 { |
| //len is 1 it means no group setting ,so do not need cluster again |
| for _, invokers := range groupInvokersMap { |
| groupInvokersList = invokers |
| } |
| } else { |
| for _, invokers := range groupInvokersMap { |
| staticDir := directory.NewStaticDirectory(invokers) |
| cluster := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) |
| staticDir.BuildRouterChain(invokers) |
| groupInvokersList = append(groupInvokersList, cluster.Join(staticDir)) |
| } |
| } |
| |
| return groupInvokersList |
| } |
| |
| // uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil |
| func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { |
| logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) |
| if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok { |
| dir.cacheInvokersMap.Delete(url.Key()) |
| return cacheInvoker.(protocol.Invoker) |
| } |
| return nil |
| } |
| |
| // cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil |
| func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { |
| dir.overrideUrl(dir.GetDirectoryUrl()) |
| referenceUrl := dir.GetDirectoryUrl().SubURL |
| |
| if url == nil && dir.cacheOriginUrl != nil { |
| url = dir.cacheOriginUrl |
| } else { |
| dir.cacheOriginUrl = url |
| } |
| if url == nil { |
| logger.Error("URL is nil ,pls check if service url is subscribe successfully!") |
| return nil |
| } |
| //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol |
| if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { |
| newUrl := common.MergeUrl(url, referenceUrl) |
| dir.overrideUrl(newUrl) |
| if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok { |
| logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl) |
| newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) |
| if newInvoker != nil { |
| dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) |
| } |
| } else { |
| logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) |
| newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) |
| if newInvoker != nil { |
| dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) |
| return cacheInvoker.(protocol.Invoker) |
| } |
| } |
| } |
| return nil |
| } |
| |
| //select the protocol invokers from the directory |
| func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { |
| invokers := dir.cacheInvokers |
| routerChain := dir.RouterChain() |
| |
| if routerChain == nil { |
| return invokers |
| } |
| return routerChain.Route(invokers, dir.cacheOriginUrl, invocation) |
| } |
| |
| func (dir *registryDirectory) IsAvailable() bool { |
| if !dir.BaseDirectory.IsAvailable() { |
| return dir.BaseDirectory.IsAvailable() |
| } |
| |
| for _, ivk := range dir.cacheInvokers { |
| if ivk.IsAvailable() { |
| return true |
| } |
| } |
| |
| return false |
| } |
| |
| func (dir *registryDirectory) Destroy() { |
| //TODO:unregister & unsubscribe |
| dir.BaseDirectory.Destroy(func() { |
| invokers := dir.cacheInvokers |
| dir.cacheInvokers = []protocol.Invoker{} |
| for _, ivk := range invokers { |
| ivk.Destroy() |
| } |
| }) |
| } |
| |
| func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) { |
| doOverrideUrl(dir.configurators, targetUrl) |
| doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl) |
| doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl) |
| } |
| |
| func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common.URL) { |
| for _, v := range configurators { |
| v.Configure(targetUrl) |
| } |
| } |
| |
| type referenceConfigurationListener struct { |
| registry.BaseConfigurationListener |
| directory *registryDirectory |
| url *common.URL |
| } |
| |
| func newReferenceConfigurationListener(dir *registryDirectory, url *common.URL) *referenceConfigurationListener { |
| listener := &referenceConfigurationListener{directory: dir, url: url} |
| listener.InitWith( |
| url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX, |
| listener, |
| extension.GetDefaultConfiguratorFunc(), |
| ) |
| return listener |
| } |
| |
| func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { |
| l.BaseConfigurationListener.Process(event) |
| l.directory.refreshInvokers(nil) |
| } |
| |
| type consumerConfigurationListener struct { |
| registry.BaseConfigurationListener |
| listeners []registry.NotifyListener |
| directory *registryDirectory |
| } |
| |
| func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener { |
| listener := &consumerConfigurationListener{directory: dir} |
| listener.InitWith( |
| config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX, |
| listener, |
| extension.GetDefaultConfiguratorFunc(), |
| ) |
| return listener |
| } |
| |
| func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) { |
| l.listeners = append(l.listeners, listener) |
| } |
| |
| func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { |
| l.BaseConfigurationListener.Process(event) |
| l.directory.refreshInvokers(nil) |
| } |