| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package chain |
| |
| import ( |
| "sort" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| perrors "github.com/pkg/errors" |
| "go.uber.org/atomic" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go/cluster/router" |
| "github.com/apache/dubbo-go/common" |
| "github.com/apache/dubbo-go/common/constant" |
| "github.com/apache/dubbo-go/common/extension" |
| "github.com/apache/dubbo-go/common/logger" |
| "github.com/apache/dubbo-go/protocol" |
| ) |
| |
| var ( |
| virtualServiceConfigByte []byte |
| destinationRuleConfigByte []byte |
| ) |
| |
| // RouterChain Router chain |
| type RouterChain struct { |
| // Full list of addresses from registry, classified by method name. |
| invokers []protocol.Invoker |
| // Containing all routers, reconstruct every time 'route://' urls change. |
| routers []router.PriorityRouter |
| // Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the |
| // instance will never delete or recreate. |
| builtinRouters []router.PriorityRouter |
| |
| mutex sync.RWMutex |
| |
| url *common.URL |
| |
| // The times of address notification since last update for address cache |
| count int64 |
| // The timestamp of last update for address cache |
| last time.Time |
| // Channel for notify to update the address cache |
| notify chan struct{} |
| // Address cache |
| cache atomic.Value |
| } |
| |
| func (c *RouterChain) GetNotifyChan() chan struct{} { |
| return c.notify |
| } |
| |
| // Route Loop routers in RouterChain and call Route method to determine the target invokers list. |
| func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker { |
| finalInvokers := c.invokers |
| for _, r := range c.copyRouters() { |
| finalInvokers = r.Route(c.invokers, url, invocation) |
| } |
| return finalInvokers |
| } |
| |
| // AddRouters Add routers to router chain |
| // New a array add builtinRouters which is not sorted in RouterChain and routers |
| // Sort the array |
| // Replace router array in RouterChain |
| func (c *RouterChain) AddRouters(routers []router.PriorityRouter) { |
| newRouters := make([]router.PriorityRouter, 0, len(c.builtinRouters)+len(routers)) |
| newRouters = append(newRouters, c.builtinRouters...) |
| newRouters = append(newRouters, routers...) |
| sortRouter(newRouters) |
| c.mutex.Lock() |
| defer c.mutex.Unlock() |
| c.routers = newRouters |
| go func() { |
| c.notify <- struct{}{} |
| }() |
| } |
| |
| // SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and |
| // time interval exceeds timeThreshold since last cache update, then notify to update the cache. |
| func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) { |
| c.mutex.Lock() |
| c.invokers = invokers |
| c.mutex.Unlock() |
| |
| go func() { |
| c.notify <- struct{}{} |
| }() |
| } |
| |
| // copyRouters make a snapshot copy from RouterChain's router list. |
| func (c *RouterChain) copyRouters() []router.PriorityRouter { |
| c.mutex.RLock() |
| defer c.mutex.RUnlock() |
| ret := make([]router.PriorityRouter, 0, len(c.routers)) |
| ret = append(ret, c.routers...) |
| return ret |
| } |
| |
| // copyInvokers copies a snapshot of the received invokers. |
| func (c *RouterChain) copyInvokers() []protocol.Invoker { |
| c.mutex.RLock() |
| defer c.mutex.RUnlock() |
| if c.invokers == nil || len(c.invokers) == 0 { |
| return nil |
| } |
| ret := make([]protocol.Invoker, 0, len(c.invokers)) |
| ret = append(ret, c.invokers...) |
| return ret |
| } |
| |
| func SetVSAndDRConfigByte(vs, dr []byte) { |
| virtualServiceConfigByte = vs |
| destinationRuleConfigByte = dr |
| } |
| |
| // NewRouterChain Use url to init router chain |
| // Loop routerFactories and call NewRouter method |
| func NewRouterChain(url *common.URL) (*RouterChain, error) { |
| routerFactories := extension.GetRouterFactories() |
| if len(routerFactories) == 0 { |
| return nil, perrors.Errorf("No routerFactory exits , create one please") |
| } |
| |
| chain := &RouterChain{ |
| last: time.Now(), |
| notify: make(chan struct{}), |
| } |
| |
| routers := make([]router.PriorityRouter, 0, len(routerFactories)) |
| |
| for key, routerFactory := range routerFactories { |
| if virtualServiceConfigByte == nil || destinationRuleConfigByte == nil { |
| logger.Warnf("virtual Service Config or destinationRule Confi Byte may be empty, pls check your CONF_VIRTUAL_SERVICE_FILE_PATH and CONF_DEST_RULE_FILE_PATH env is correctly point to your yaml file\n") |
| } |
| r, err := routerFactory().NewPriorityRouter(virtualServiceConfigByte, destinationRuleConfigByte, chain.notify) |
| if r == nil || err != nil { |
| logger.Errorf("router chain build router fail! routerFactories key:%s error:%vv", key, err) |
| continue |
| } |
| routers = append(routers, r) |
| } |
| |
| newRouters := make([]router.PriorityRouter, len(routers)) |
| copy(newRouters, routers) |
| |
| sortRouter(newRouters) |
| |
| routerNeedsUpdateInit := atomic.Bool{} |
| routerNeedsUpdateInit.Store(false) |
| chain.routers = newRouters |
| chain.builtinRouters = routers |
| if url != nil { |
| chain.url = url |
| } |
| |
| return chain, nil |
| } |
| |
| // isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left. |
| func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool { |
| if len(right) != len(left) { |
| return true |
| } |
| |
| for _, r := range right { |
| found := false |
| rurl := r.GetUrl() |
| for _, l := range left { |
| lurl := l.GetUrl() |
| if common.GetCompareURLEqualFunc()(lurl, rurl, constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) { |
| found = true |
| break |
| } |
| } |
| if !found { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // sortRouter Sort router instance by priority with stable algorithm |
| func sortRouter(routers []router.PriorityRouter) { |
| sort.Stable(byPriority(routers)) |
| } |
| |
| // byPriority Sort by priority |
| type byPriority []router.PriorityRouter |
| |
| func (a byPriority) Len() int { return len(a) } |
| func (a byPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
| func (a byPriority) Less(i, j int) bool { return a[i].Priority() < a[j].Priority() } |