blob: 984ecb4eeff71dd692cbda8db5e3b87dbdde4254 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tag
import (
"strconv"
"strings"
"sync"
)
import (
"github.com/RoaringBitmap/roaring"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/remoting"
)
const (
name = "tag-router"
staticPrefix = "static-"
dynamicPrefix = "dynamic-"
)
// addrMetadata keeps snapshot data for app name, and some infos extracted from dynamic tag rule in order to make
// Route() method lock-free.
type addrMetadata struct {
// application name
application string
// is rule a runtime rule
//ruleRuntime bool
// is rule a force rule
ruleForce bool
// is rule a valid rule
ruleValid bool
// is rule an enabled rule
ruleEnabled bool
}
// Source indicates where the metadata comes from.
func (m *addrMetadata) Source() string {
return name
}
// tagRouter defines url, enable and the priority
type tagRouter struct {
url *common.URL
tagRouterRule *RouterRule
enabled bool
priority int64
application string
ruleChanged bool
mutex sync.RWMutex
}
// NewTagRouter returns a tagRouter instance if url is not nil
func NewTagRouter(url *common.URL) (*tagRouter, error) {
if url == nil {
return nil, perrors.Errorf("Illegal route URL!")
}
return &tagRouter{
url: url,
enabled: url.GetParamBool(constant.RouterEnabled, true),
priority: url.GetParamInt(constant.RouterPriority, 0),
}, nil
}
// nolint
func (c *tagRouter) isEnabled() bool {
return c.enabled
}
// Route gets a list of invoker
func (c *tagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !c.isEnabled() || invokers.IsEmpty() {
return invokers
}
if shouldUseDynamicTag(cache.FindAddrMeta(c)) {
return c.routeWithDynamicTag(invokers, cache, url, invocation)
}
return c.routeWithStaticTag(invokers, cache, url, invocation)
}
// routeWithStaticTag routes with static tag rule
func (c *tagRouter) routeWithStaticTag(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
tag := findTag(invocation, url)
if tag == "" {
return invokers
}
ret, _ := c.filterWithTag(invokers, cache, staticPrefix+tag)
if ret.IsEmpty() && !isForceUseTag(url, invocation) {
return invokers
}
return ret
}
// routeWithDynamicTag routes with dynamic tag rule
func (c *tagRouter) routeWithDynamicTag(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
tag := findTag(invocation, url)
if tag == "" {
return c.filterNotInDynamicTag(invokers, cache)
}
ret, ok := c.filterWithTag(invokers, cache, dynamicPrefix+tag)
if ok && (!ret.IsEmpty() || isTagRuleForce(cache.FindAddrMeta(c))) {
return ret
}
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
if ret.IsEmpty() {
ret, _ = c.filterWithTag(invokers, cache, staticPrefix+tag)
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if !ret.IsEmpty() || isForceUseTag(url, invocation) {
return ret
}
return c.filterNotInDynamicTag(invokers, cache)
}
return ret
}
// filterWithTag filters incoming invokers with the given tag
func (c *tagRouter) filterWithTag(invokers *roaring.Bitmap, cache router.Cache, tag string) (*roaring.Bitmap, bool) {
if target, ok := cache.FindAddrPool(c)[tag]; ok {
return utils.JoinIfNotEqual(target, invokers), true
}
return utils.EmptyAddr, false
}
// filterNotInDynamicTag filters incoming invokers not applied to dynamic tag rule
func (c *tagRouter) filterNotInDynamicTag(invokers *roaring.Bitmap, cache router.Cache) *roaring.Bitmap {
// FAILOVER: return all Providers without any tags.
invokers = invokers.Clone()
for k, v := range cache.FindAddrPool(c) {
if strings.HasPrefix(k, dynamicPrefix) {
invokers.AndNot(v)
}
}
return invokers
}
// Process parses dynamic tag rule
func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
c.tagRouterRule = nil
return
}
content, ok := event.Value.(string)
if !ok {
logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value)
return
}
routerRule, err := getRule(content)
if err != nil {
logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err)
return
}
c.mutex.Lock()
defer c.mutex.Unlock()
c.tagRouterRule = routerRule
c.ruleChanged = true
}
// URL gets the url of tagRouter
func (c *tagRouter) URL() *common.URL {
return c.url
}
// Priority gets the priority of tagRouter
func (c *tagRouter) Priority() int64 {
return c.priority
}
// Pool divided invokers into different address pool by tag.
func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
c.fetchRuleIfNecessary(invokers)
rb := make(router.AddrPool, 8)
poolWithStaticTag(invokers, rb)
c.mutex.Lock()
defer c.mutex.Unlock()
poolWithDynamicTag(invokers, c.tagRouterRule, rb)
c.ruleChanged = false
// create metadata in order to avoid lock in route()
meta := addrMetadata{application: c.application}
if c.tagRouterRule != nil {
meta.ruleForce = c.tagRouterRule.Force
meta.ruleEnabled = c.tagRouterRule.Enabled
meta.ruleValid = c.tagRouterRule.Valid
}
return rb, &meta
}
// fetchRuleIfNecessary fetches, parses rule and register listener for the further change
func (c *tagRouter) fetchRuleIfNecessary(invokers []protocol.Invoker) {
if len(invokers) == 0 {
return
}
url := invokers[0].GetUrl()
providerApplication := url.GetParam(constant.RemoteApplicationKey, "")
if len(providerApplication) == 0 {
logger.Error("TagRouter must getConfig from or subscribe to a specific application, but the application " +
"in this TagRouter is not specified.")
return
}
dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
logger.Error("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please")
return
}
if providerApplication != c.application {
dynamicConfiguration.RemoveListener(c.application+constant.TagRouterRuleSuffix, c)
} else {
// if app name from URL is as same as the current app name, then it is safe to jump out
return
}
c.application = providerApplication
routerKey := providerApplication + constant.TagRouterRuleSuffix
dynamicConfiguration.AddListener(routerKey, c)
// get rule
rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
if len(rule) == 0 || err != nil {
logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
return
}
if len(rule) > 0 {
c.Process(&config_center.ConfigChangeEvent{
Key: routerKey,
Value: rule,
ConfigType: remoting.EventTypeUpdate})
}
}
// ShouldPool returns false, to make sure address cache for tag router happens once and only once.
func (c *tagRouter) ShouldPool() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.ruleChanged
}
// Name returns pool's name
func (c *tagRouter) Name() string {
return name
}
// poolWithDynamicTag pools addresses with the tags defined in dynamic tag rule, all keys have prefix "dynamic-"
func poolWithDynamicTag(invokers []protocol.Invoker, rule *RouterRule, pool router.AddrPool) {
if rule == nil {
return
}
tagNameToAddresses := rule.getTagNameToAddresses()
for tag, addrs := range tagNameToAddresses {
pool[dynamicPrefix+tag] = addrsToBitmap(addrs, invokers)
}
}
// poolWithStaticTag pools addresses with tags found from incoming URLs, all keys have prefix "static-"
func poolWithStaticTag(invokers []protocol.Invoker, pool router.AddrPool) {
for i, invoker := range invokers {
url := invoker.GetUrl()
tag := url.GetParam(constant.Tagkey, "")
if len(tag) > 0 {
if _, ok := pool[staticPrefix+tag]; !ok {
pool[staticPrefix+tag] = roaring.NewBitmap()
}
pool[staticPrefix+tag].AddInt(i)
}
}
}
// shouldUseDynamicTag uses the snapshot data from the parsed rule to decide if dynamic tag rule should be used or not
func shouldUseDynamicTag(meta router.AddrMetadata) bool {
return meta.(*addrMetadata).ruleValid && meta.(*addrMetadata).ruleEnabled
}
// isTagRuleForce uses the snapshot data from the parsed rule to decide if dynamic tag rule is forced or not
func isTagRuleForce(meta router.AddrMetadata) bool {
return meta.(*addrMetadata).ruleForce
}
// isForceUseTag returns whether force use tag
func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool {
if b, e := strconv.ParseBool(invocation.AttachmentsByKey(constant.ForceUseTag, url.GetParam(constant.ForceUseTag, "false"))); e == nil {
return b
}
return false
}
// addrsToBitmap finds indexes for the given IP addresses in the target URL list, if any '0.0.0.0' IP address is met,
// then returns back all indexes of the URLs list.
func addrsToBitmap(addrs []string, invokers []protocol.Invoker) *roaring.Bitmap {
ret := roaring.NewBitmap()
for _, addr := range addrs {
if isAnyHost(addr) {
ret.AddRange(0, uint64(len(invokers)))
return ret
}
index := findIndexWithIp(addr, invokers)
if index != -1 {
ret.AddInt(index)
}
}
return ret
}
// findIndexWithIp finds index for one particular IP
func findIndexWithIp(addr string, invokers []protocol.Invoker) int {
for i, invoker := range invokers {
if gxnet.MatchIP(addr, invoker.GetUrl().Ip, invoker.GetUrl().Port) {
return i
}
}
return -1
}
// isAnyHost checks if an IP is '0.0.0.0'
func isAnyHost(addr string) bool {
return strings.HasPrefix(addr, constant.ANYHOST_VALUE)
}
// findTag finds tag, first from invocation's attachment, then from URL
func findTag(invocation protocol.Invocation, consumerUrl *common.URL) string {
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
return consumerUrl.GetParam(constant.Tagkey, "")
} else if v, t := tag.(string); t {
return v
}
return ""
}