blob: f94155380475fc98718acf6ff9da600c6814d6fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2020 gRPC authors.
*
*/
package resolver
import (
"context"
"encoding/json"
"fmt"
"math/bits"
"strings"
"sync/atomic"
"time"
)
import (
xxhash "github.com/cespare/xxhash/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
import (
"dubbo.apache.org/dubbo-go/v3/xds/balancer/clustermanager"
"dubbo.apache.org/dubbo-go/v3/xds/balancer/ringhash"
"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
"dubbo.apache.org/dubbo-go/v3/xds/httpfilter"
"dubbo.apache.org/dubbo-go/v3/xds/httpfilter/router"
"dubbo.apache.org/dubbo-go/v3/xds/utils/envconfig"
"dubbo.apache.org/dubbo-go/v3/xds/utils/grpcrand"
iresolver "dubbo.apache.org/dubbo-go/v3/xds/utils/resolver"
"dubbo.apache.org/dubbo-go/v3/xds/utils/serviceconfig"
"dubbo.apache.org/dubbo-go/v3/xds/utils/wrr"
)
const (
cdsName = "cds_experimental"
xdsClusterManagerName = "xds_cluster_manager_experimental"
clusterPrefix = "cluster:"
clusterSpecifierPluginPrefix = "cluster_specifier_plugin:"
)
type serviceConfig struct {
LoadBalancingConfig balancerConfig `json:"loadBalancingConfig"`
}
type balancerConfig []map[string]interface{}
func newBalancerConfig(name string, config interface{}) balancerConfig {
return []map[string]interface{}{{name: config}}
}
type cdsBalancerConfig struct {
Cluster string `json:"cluster"`
}
type xdsChildConfig struct {
ChildPolicy balancerConfig `json:"childPolicy"`
}
type xdsClusterManagerConfig struct {
Children map[string]xdsChildConfig `json:"children"`
}
// pruneActiveClusters deletes entries in r.activeClusters with zero
// references.
func (r *xdsResolver) pruneActiveClusters() {
for cluster, ci := range r.activeClusters {
if atomic.LoadInt32(&ci.refCount) == 0 {
delete(r.activeClusters, cluster)
}
}
}
// serviceConfigJSON produces a service config in JSON format representing all
// the clusters referenced in activeClusters. This includes clusters with zero
// references, so they must be pruned first.
func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
// Generate children (all entries in activeClusters).
children := make(map[string]xdsChildConfig)
for cluster, ci := range activeClusters {
children[cluster] = ci.cfg
}
sc := serviceConfig{
LoadBalancingConfig: newBalancerConfig(
xdsClusterManagerName, xdsClusterManagerConfig{Children: children},
),
}
bs, err := json.Marshal(sc)
if err != nil {
return nil, fmt.Errorf("failed to marshal json: %v", err)
}
return bs, nil
}
type virtualHost struct {
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
// retry policy present in virtual host
retryConfig *resource.RetryConfig
}
// routeCluster holds information about a cluster as referenced by a route.
type routeCluster struct {
name string
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
}
type route struct {
m *resource.CompositeMatcher // converted from route matchers
clusters wrr.WRR // holds *routeCluster entries
maxStreamDuration time.Duration
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
retryConfig *resource.RetryConfig
hashPolicies []*resource.HashPolicy
}
func (r route) String() string {
return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration)
}
type configSelector struct {
r *xdsResolver
virtualHost virtualHost
routes []route
clusters map[string]*clusterInfo
httpFilterConfig []resource.HTTPFilter
}
var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")
func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
if cs == nil {
return nil, status.Errorf(codes.Unavailable, "no valid clusters")
}
var rt *route
// Loop through routes in order and select first match.
for _, r := range cs.routes {
if r.m.Match(rpcInfo) {
rt = &r
break
}
}
if rt == nil || rt.clusters == nil {
return nil, errNoMatchedRouteFound
}
cluster, ok := rt.clusters.Next().(*routeCluster)
if !ok {
return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
}
// Add a ref to the selected cluster, as this RPC needs this cluster until
// it is committed.
ref := &cs.clusters[cluster.name].refCount
atomic.AddInt32(ref, 1)
interceptor, err := cs.newInterceptor(rt, cluster)
if err != nil {
return nil, err
}
lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name)
// Request Hashes are only applicable for a Ring Hash LB.
if envconfig.XDSRingHash {
lbCtx = ringhash.SetRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies))
}
config := &iresolver.RPCConfig{
// Communicate to the LB policy the chosen cluster and request hash, if Ring Hash LB policy.
Context: lbCtx,
OnCommitted: func() {
// When the RPC is committed, the cluster is no longer required.
// Decrease its ref.
if v := atomic.AddInt32(ref, -1); v == 0 {
// This entry will be removed from activeClusters when
// producing the service config for the empty update.
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
}
},
Interceptor: interceptor,
}
if rt.maxStreamDuration != 0 {
config.MethodConfig.Timeout = &rt.maxStreamDuration
}
if rt.retryConfig != nil {
config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig)
} else if cs.virtualHost.retryConfig != nil {
config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig)
}
return config, nil
}
func retryConfigToPolicy(config *resource.RetryConfig) *serviceconfig.RetryPolicy {
return &serviceconfig.RetryPolicy{
MaxAttempts: int(config.NumRetries) + 1,
InitialBackoff: config.RetryBackoff.BaseInterval,
MaxBackoff: config.RetryBackoff.MaxInterval,
BackoffMultiplier: 2,
RetryableStatusCodes: config.RetryOn,
}
}
func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*resource.HashPolicy) uint64 {
var hash uint64
var generatedHash bool
for _, policy := range hashPolicies {
var policyHash uint64
var generatedPolicyHash bool
switch policy.HashPolicyType {
case resource.HashPolicyTypeHeader:
md, ok := metadata.FromOutgoingContext(rpcInfo.Context)
if !ok {
continue
}
values := md.Get(policy.HeaderName)
// If the header isn't present, no-op.
if len(values) == 0 {
continue
}
joinedValues := strings.Join(values, ",")
if policy.Regex != nil {
joinedValues = policy.Regex.ReplaceAllString(joinedValues, policy.RegexSubstitution)
}
policyHash = xxhash.Sum64String(joinedValues)
generatedHash = true
generatedPolicyHash = true
case resource.HashPolicyTypeChannelID:
// Hash the ClientConn pointer which logically uniquely
// identifies the client.
policyHash = xxhash.Sum64String(fmt.Sprintf("%p", &cs.r.cc))
generatedHash = true
generatedPolicyHash = true
}
// Deterministically combine the hash policies. Rotating prevents
// duplicate hash policies from canceling each other out and preserves
// the 64 bits of entropy.
if generatedPolicyHash {
hash = bits.RotateLeft64(hash, 1)
hash = hash ^ policyHash
}
// If terminal policy and a hash has already been generated, ignore the
// rest of the policies and use that hash already generated.
if policy.Terminal && generatedHash {
break
}
}
if generatedHash {
return hash
}
// If no generated hash return a random long. In the grand scheme of things
// this logically will map to choosing a random backend to route request to.
return grpcrand.Uint64()
}
func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
if len(cs.httpFilterConfig) == 0 {
return nil, nil
}
interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig))
for _, filter := range cs.httpFilterConfig {
if router.IsRouterFilter(filter.Filter) {
// Ignore any filters after the router filter. The router itself
// is currently a nop.
return &interceptorList{interceptors: interceptors}, nil
}
override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority
if override == nil {
override = rt.httpFilterConfigOverride[filter.Name] // route is second priority
}
if override == nil {
override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority
}
ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
if !ok {
// Should not happen if it passed xdsClient validation.
return nil, fmt.Errorf("filter does not support use in client")
}
i, err := ib.BuildClientInterceptor(filter.Config, override)
if err != nil {
return nil, fmt.Errorf("error constructing filter: %v", err)
}
if i != nil {
interceptors = append(interceptors, i)
}
}
return nil, fmt.Errorf("error in xds config: no router filter present")
}
// stop decrements refs of all clusters referenced by this config selector.
func (cs *configSelector) stop() {
// The resolver's old configSelector may be nil. Handle that here.
if cs == nil {
return
}
// If any refs drop to zero, we'll need a service config update to delete
// the cluster.
needUpdate := false
// Loops over cs.clusters, but these are pointers to entries in
// activeClusters.
for _, ci := range cs.clusters {
if v := atomic.AddInt32(&ci.refCount, -1); v == 0 {
needUpdate = true
}
}
// We stop the old config selector immediately after sending a new config
// selector; we need another update to delete clusters from the config (if
// we don't have another update pending already).
if needUpdate {
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
}
}
// A global for testing.
var newWRR = wrr.NewRandom
// newConfigSelector creates the config selector for su; may add entries to
// r.activeClusters for previously-unseen clusters.
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
virtualHost: virtualHost{
httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
retryConfig: su.virtualHost.RetryConfig,
},
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: su.ldsConfig.httpFilterConfig,
}
for i, rt := range su.virtualHost.Routes {
clusters := newWRR()
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{
name: clusterName,
}, 1)
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]),
})
} else {
for cluster, wc := range rt.WeightedClusters {
clusterName := clusterPrefix + cluster
clusters.Add(&routeCluster{
name: clusterName,
httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
}, int64(wc.Weight))
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}),
})
}
}
cs.routes[i].clusters = clusters
var err error
cs.routes[i].m, err = resource.RouteToMatcher(rt)
if err != nil {
return nil, err
}
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
} else {
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
}
cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
cs.routes[i].retryConfig = rt.RetryConfig
cs.routes[i].hashPolicies = rt.HashPolicies
}
// Account for this config selector's clusters. Do this after no further
// errors may occur. Note: cs.clusters are pointers to entries in
// activeClusters.
for _, ci := range cs.clusters {
atomic.AddInt32(&ci.refCount, 1)
}
return cs, nil
}
// initializeCluster initializes entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Any created entries will have a ref count set
// to zero as their ref count will be incremented by incRefs.
func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) {
ci := cs.r.activeClusters[clusterName]
if ci == nil {
ci = &clusterInfo{refCount: 0}
cs.r.activeClusters[clusterName] = ci
}
cs.clusters[clusterName] = ci
cs.clusters[clusterName].cfg = cfg
}
type clusterInfo struct {
// number of references to this cluster; accessed atomically
refCount int32
// cfg is the child configuration for this cluster, containing either the
// csp config or the cds cluster config.
cfg xdsChildConfig
}
type interceptorList struct {
interceptors []iresolver.ClientInterceptor
}
func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
for i := len(il.interceptors) - 1; i >= 0; i-- {
ns := newStream
interceptor := il.interceptors[i]
newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return interceptor.NewStream(ctx, ri, done, ns)
}
}
return newStream(ctx, func() {})
}