blob: 13cb1ff3528414a3c164c6b634cfd92a3cd17664 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chain
import (
"sort"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
const (
timeInterval = 5 * time.Second
)
// RouterChain Router chain
type RouterChain struct {
// Full list of addresses from registry, classified by method name.
invokers []protocol.Invoker
// Containing all routers, reconstruct every time 'route://' urls change.
routers []router.PriorityRouter
// Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the
// instance will never delete or recreate.
builtinRouters []router.PriorityRouter
mutex sync.RWMutex
url *common.URL
// The times of address notification since last update for address cache
count int64
// The timestamp of last update for address cache
last time.Time
// Channel for notify to update the address cache
notify chan struct{}
// Address cache
cache atomic.Value
}
func (c *RouterChain) GetNotifyChan() chan struct{} {
return c.notify
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
cache := c.loadCache()
if cache == nil {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.invokers
}
bitmap := cache.bitmap
for _, r := range c.copyRouters() {
bitmap = r.Route(bitmap, cache, url, invocation)
}
indexes := bitmap.ToArray()
finalInvokers := make([]protocol.Invoker, len(indexes))
for i, index := range indexes {
finalInvokers[i] = cache.invokers[index]
}
return finalInvokers
}
// AddRouters Add routers to router chain
// New a array add builtinRouters which is not sorted in RouterChain and routers
// Sort the array
// Replace router array in RouterChain
func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
newRouters := make([]router.PriorityRouter, 0, len(c.builtinRouters)+len(routers))
newRouters = append(newRouters, c.builtinRouters...)
newRouters = append(newRouters, routers...)
sortRouter(newRouters)
c.mutex.Lock()
defer c.mutex.Unlock()
c.routers = newRouters
go func() {
c.notify <- struct{}{}
}()
}
// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
// time interval exceeds timeThreshold since last cache update, then notify to update the cache.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.mutex.Lock()
c.invokers = invokers
c.mutex.Unlock()
go func() {
c.notify <- struct{}{}
}()
}
// loop listens on events to update the address cache when it receives notification
// from address update,
func (c *RouterChain) loop() {
ticker := time.NewTicker(timeInterval)
for {
select {
case <-ticker.C:
if protocol.GetAndRefreshState() {
c.buildCache()
}
case <-c.notify:
c.buildCache()
}
}
}
// copyRouters make a snapshot copy from RouterChain's router list.
func (c *RouterChain) copyRouters() []router.PriorityRouter {
c.mutex.RLock()
defer c.mutex.RUnlock()
ret := make([]router.PriorityRouter, 0, len(c.routers))
ret = append(ret, c.routers...)
return ret
}
// copyInvokers copies a snapshot of the received invokers.
func (c *RouterChain) copyInvokers() []protocol.Invoker {
c.mutex.RLock()
defer c.mutex.RUnlock()
if c.invokers == nil || len(c.invokers) == 0 {
return nil
}
ret := make([]protocol.Invoker, 0, len(c.invokers))
ret = append(ret, c.invokers...)
return ret
}
// loadCache loads cache from sync.Value to guarantee the visibility
func (c *RouterChain) loadCache() *InvokerCache {
v := c.cache.Load()
if v == nil {
return nil
}
return v.(*InvokerCache)
}
// copyInvokerIfNecessary compares chain's invokers copy and cache's invokers copy, to avoid copy as much as possible
func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Invoker {
var invokers []protocol.Invoker
if cache != nil {
invokers = cache.invokers
}
c.mutex.RLock()
defer c.mutex.RUnlock()
if isInvokersChanged(invokers, c.invokers) {
invokers = c.copyInvokers()
}
return invokers
}
// buildCache builds address cache with the new invokers for all poolable routers.
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if len(invokers) == 0 {
return
}
var (
mutex sync.Mutex
wg sync.WaitGroup
)
cache := BuildCache(invokers)
for _, r := range c.copyRouters() {
if p, ok := r.(router.Poolable); ok {
wg.Add(1)
go func(p router.Poolable) {
defer wg.Done()
pool, info := poolRouter(p, origin, invokers)
mutex.Lock()
defer mutex.Unlock()
cache.pools[p.Name()] = pool
cache.metadatas[p.Name()] = info
}(p)
}
}
wg.Wait()
c.cache.Store(cache)
}
// URL Return URL in RouterChain
func (c *RouterChain) URL() *common.URL {
return c.url
}
// NewRouterChain Use url to init router chain
// Loop routerFactories and call NewRouter method
func NewRouterChain(url *common.URL) (*RouterChain, error) {
routerFactories := extension.GetRouterFactories()
if len(routerFactories) == 0 {
return nil, perrors.Errorf("No routerFactory exits , create one please")
}
chain := &RouterChain{
last: time.Now(),
notify: make(chan struct{}),
}
routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
r, err := routerFactory().NewPriorityRouter(url, chain.notify)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error())
continue
}
routers = append(routers, r)
}
newRouters := make([]router.PriorityRouter, len(routers))
copy(newRouters, routers)
sortRouter(newRouters)
routerNeedsUpdateInit := atomic.Bool{}
routerNeedsUpdateInit.Store(false)
chain.routers = newRouters
chain.builtinRouters = routers
if url != nil {
chain.url = url
}
go chain.loop()
return chain, nil
}
// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary.
// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
name := p.Name()
if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers {
logger.Debugf("build address cache for router %q", name)
return p.Pool(invokers)
}
logger.Debugf("reuse existing address cache for router %q", name)
return origin.pools[name], origin.metadatas[name]
}
// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed.
// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry
// doesn't exist.
func isCacheMiss(cache *InvokerCache, key string) bool {
if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil {
return true
}
return false
}
// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
if len(right) != len(left) {
return true
}
for _, r := range right {
found := false
rurl := r.GetUrl()
for _, l := range left {
lurl := l.GetUrl()
if common.GetCompareURLEqualFunc()(lurl, rurl, constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) {
found = true
break
}
}
if !found {
return true
}
}
return false
}
// sortRouter Sort router instance by priority with stable algorithm
func sortRouter(routers []router.PriorityRouter) {
sort.Stable(byPriority(routers))
}
// byPriority Sort by priority
type byPriority []router.PriorityRouter
func (a byPriority) Len() int { return len(a) }
func (a byPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byPriority) Less(i, j int) bool { return a[i].Priority() < a[j].Priority() }