| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| */ |
| |
| package resolver |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "math/bits" |
| "strings" |
| "sync/atomic" |
| "time" |
| ) |
| |
| import ( |
| xxhash "github.com/cespare/xxhash/v2" |
| |
| "google.golang.org/grpc/codes" |
| |
| "google.golang.org/grpc/metadata" |
| |
| "google.golang.org/grpc/status" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/xds/balancer/clustermanager" |
| "dubbo.apache.org/dubbo-go/v3/xds/balancer/ringhash" |
| "dubbo.apache.org/dubbo-go/v3/xds/client/resource" |
| "dubbo.apache.org/dubbo-go/v3/xds/httpfilter" |
| "dubbo.apache.org/dubbo-go/v3/xds/httpfilter/router" |
| "dubbo.apache.org/dubbo-go/v3/xds/utils/envconfig" |
| "dubbo.apache.org/dubbo-go/v3/xds/utils/grpcrand" |
| iresolver "dubbo.apache.org/dubbo-go/v3/xds/utils/resolver" |
| "dubbo.apache.org/dubbo-go/v3/xds/utils/serviceconfig" |
| "dubbo.apache.org/dubbo-go/v3/xds/utils/wrr" |
| ) |
| |
| const ( |
| cdsName = "cds_experimental" |
| xdsClusterManagerName = "xds_cluster_manager_experimental" |
| clusterPrefix = "cluster:" |
| clusterSpecifierPluginPrefix = "cluster_specifier_plugin:" |
| ) |
| |
| type serviceConfig struct { |
| LoadBalancingConfig balancerConfig `json:"loadBalancingConfig"` |
| } |
| |
| type balancerConfig []map[string]interface{} |
| |
| func newBalancerConfig(name string, config interface{}) balancerConfig { |
| return []map[string]interface{}{{name: config}} |
| } |
| |
| type cdsBalancerConfig struct { |
| Cluster string `json:"cluster"` |
| } |
| |
| type xdsChildConfig struct { |
| ChildPolicy balancerConfig `json:"childPolicy"` |
| } |
| |
| type xdsClusterManagerConfig struct { |
| Children map[string]xdsChildConfig `json:"children"` |
| } |
| |
| // pruneActiveClusters deletes entries in r.activeClusters with zero |
| // references. |
| func (r *xdsResolver) pruneActiveClusters() { |
| for cluster, ci := range r.activeClusters { |
| if atomic.LoadInt32(&ci.refCount) == 0 { |
| delete(r.activeClusters, cluster) |
| } |
| } |
| } |
| |
| // serviceConfigJSON produces a service config in JSON format representing all |
| // the clusters referenced in activeClusters. This includes clusters with zero |
| // references, so they must be pruned first. |
| func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) { |
| // Generate children (all entries in activeClusters). |
| children := make(map[string]xdsChildConfig) |
| for cluster, ci := range activeClusters { |
| children[cluster] = ci.cfg |
| } |
| |
| sc := serviceConfig{ |
| LoadBalancingConfig: newBalancerConfig( |
| xdsClusterManagerName, xdsClusterManagerConfig{Children: children}, |
| ), |
| } |
| |
| bs, err := json.Marshal(sc) |
| if err != nil { |
| return nil, fmt.Errorf("failed to marshal json: %v", err) |
| } |
| return bs, nil |
| } |
| |
| type virtualHost struct { |
| // map from filter name to its config |
| httpFilterConfigOverride map[string]httpfilter.FilterConfig |
| // retry policy present in virtual host |
| retryConfig *resource.RetryConfig |
| } |
| |
| // routeCluster holds information about a cluster as referenced by a route. |
| type routeCluster struct { |
| name string |
| // map from filter name to its config |
| httpFilterConfigOverride map[string]httpfilter.FilterConfig |
| } |
| |
| type route struct { |
| m *resource.CompositeMatcher // converted from route matchers |
| clusters wrr.WRR // holds *routeCluster entries |
| maxStreamDuration time.Duration |
| // map from filter name to its config |
| httpFilterConfigOverride map[string]httpfilter.FilterConfig |
| retryConfig *resource.RetryConfig |
| hashPolicies []*resource.HashPolicy |
| } |
| |
| func (r route) String() string { |
| return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration) |
| } |
| |
| type configSelector struct { |
| r *xdsResolver |
| virtualHost virtualHost |
| routes []route |
| clusters map[string]*clusterInfo |
| httpFilterConfig []resource.HTTPFilter |
| } |
| |
| var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") |
| |
| func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) { |
| if cs == nil { |
| return nil, status.Errorf(codes.Unavailable, "no valid clusters") |
| } |
| var rt *route |
| // Loop through routes in order and select first match. |
| for _, r := range cs.routes { |
| if r.m.Match(rpcInfo) { |
| rt = &r |
| break |
| } |
| } |
| if rt == nil || rt.clusters == nil { |
| return nil, errNoMatchedRouteFound |
| } |
| |
| cluster, ok := rt.clusters.Next().(*routeCluster) |
| if !ok { |
| return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) |
| } |
| |
| // Add a ref to the selected cluster, as this RPC needs this cluster until |
| // it is committed. |
| ref := &cs.clusters[cluster.name].refCount |
| atomic.AddInt32(ref, 1) |
| |
| interceptor, err := cs.newInterceptor(rt, cluster) |
| if err != nil { |
| return nil, err |
| } |
| |
| lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name) |
| // Request Hashes are only applicable for a Ring Hash LB. |
| if envconfig.XDSRingHash { |
| lbCtx = ringhash.SetRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies)) |
| } |
| |
| config := &iresolver.RPCConfig{ |
| // Communicate to the LB policy the chosen cluster and request hash, if Ring Hash LB policy. |
| Context: lbCtx, |
| OnCommitted: func() { |
| // When the RPC is committed, the cluster is no longer required. |
| // Decrease its ref. |
| if v := atomic.AddInt32(ref, -1); v == 0 { |
| // This entry will be removed from activeClusters when |
| // producing the service config for the empty update. |
| select { |
| case cs.r.updateCh <- suWithError{emptyUpdate: true}: |
| default: |
| } |
| } |
| }, |
| Interceptor: interceptor, |
| } |
| |
| if rt.maxStreamDuration != 0 { |
| config.MethodConfig.Timeout = &rt.maxStreamDuration |
| } |
| if rt.retryConfig != nil { |
| config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig) |
| } else if cs.virtualHost.retryConfig != nil { |
| config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig) |
| } |
| |
| return config, nil |
| } |
| |
| func retryConfigToPolicy(config *resource.RetryConfig) *serviceconfig.RetryPolicy { |
| return &serviceconfig.RetryPolicy{ |
| MaxAttempts: int(config.NumRetries) + 1, |
| InitialBackoff: config.RetryBackoff.BaseInterval, |
| MaxBackoff: config.RetryBackoff.MaxInterval, |
| BackoffMultiplier: 2, |
| RetryableStatusCodes: config.RetryOn, |
| } |
| } |
| |
| func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*resource.HashPolicy) uint64 { |
| var hash uint64 |
| var generatedHash bool |
| for _, policy := range hashPolicies { |
| var policyHash uint64 |
| var generatedPolicyHash bool |
| switch policy.HashPolicyType { |
| case resource.HashPolicyTypeHeader: |
| md, ok := metadata.FromOutgoingContext(rpcInfo.Context) |
| if !ok { |
| continue |
| } |
| values := md.Get(policy.HeaderName) |
| // If the header isn't present, no-op. |
| if len(values) == 0 { |
| continue |
| } |
| joinedValues := strings.Join(values, ",") |
| if policy.Regex != nil { |
| joinedValues = policy.Regex.ReplaceAllString(joinedValues, policy.RegexSubstitution) |
| } |
| policyHash = xxhash.Sum64String(joinedValues) |
| generatedHash = true |
| generatedPolicyHash = true |
| case resource.HashPolicyTypeChannelID: |
| // Hash the ClientConn pointer which logically uniquely |
| // identifies the client. |
| policyHash = xxhash.Sum64String(fmt.Sprintf("%p", &cs.r.cc)) |
| generatedHash = true |
| generatedPolicyHash = true |
| } |
| |
| // Deterministically combine the hash policies. Rotating prevents |
| // duplicate hash policies from canceling each other out and preserves |
| // the 64 bits of entropy. |
| if generatedPolicyHash { |
| hash = bits.RotateLeft64(hash, 1) |
| hash = hash ^ policyHash |
| } |
| |
| // If terminal policy and a hash has already been generated, ignore the |
| // rest of the policies and use that hash already generated. |
| if policy.Terminal && generatedHash { |
| break |
| } |
| } |
| |
| if generatedHash { |
| return hash |
| } |
| // If no generated hash return a random long. In the grand scheme of things |
| // this logically will map to choosing a random backend to route request to. |
| return grpcrand.Uint64() |
| } |
| |
| func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) { |
| if len(cs.httpFilterConfig) == 0 { |
| return nil, nil |
| } |
| interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig)) |
| for _, filter := range cs.httpFilterConfig { |
| if router.IsRouterFilter(filter.Filter) { |
| // Ignore any filters after the router filter. The router itself |
| // is currently a nop. |
| return &interceptorList{interceptors: interceptors}, nil |
| } |
| override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority |
| if override == nil { |
| override = rt.httpFilterConfigOverride[filter.Name] // route is second priority |
| } |
| if override == nil { |
| override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority |
| } |
| ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) |
| if !ok { |
| // Should not happen if it passed xdsClient validation. |
| return nil, fmt.Errorf("filter does not support use in client") |
| } |
| i, err := ib.BuildClientInterceptor(filter.Config, override) |
| if err != nil { |
| return nil, fmt.Errorf("error constructing filter: %v", err) |
| } |
| if i != nil { |
| interceptors = append(interceptors, i) |
| } |
| } |
| return nil, fmt.Errorf("error in xds config: no router filter present") |
| } |
| |
| // stop decrements refs of all clusters referenced by this config selector. |
| func (cs *configSelector) stop() { |
| // The resolver's old configSelector may be nil. Handle that here. |
| if cs == nil { |
| return |
| } |
| // If any refs drop to zero, we'll need a service config update to delete |
| // the cluster. |
| needUpdate := false |
| // Loops over cs.clusters, but these are pointers to entries in |
| // activeClusters. |
| for _, ci := range cs.clusters { |
| if v := atomic.AddInt32(&ci.refCount, -1); v == 0 { |
| needUpdate = true |
| } |
| } |
| // We stop the old config selector immediately after sending a new config |
| // selector; we need another update to delete clusters from the config (if |
| // we don't have another update pending already). |
| if needUpdate { |
| select { |
| case cs.r.updateCh <- suWithError{emptyUpdate: true}: |
| default: |
| } |
| } |
| } |
| |
| // A global for testing. |
| var newWRR = wrr.NewRandom |
| |
| // newConfigSelector creates the config selector for su; may add entries to |
| // r.activeClusters for previously-unseen clusters. |
| func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) { |
| cs := &configSelector{ |
| r: r, |
| virtualHost: virtualHost{ |
| httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride, |
| retryConfig: su.virtualHost.RetryConfig, |
| }, |
| routes: make([]route, len(su.virtualHost.Routes)), |
| clusters: make(map[string]*clusterInfo), |
| httpFilterConfig: su.ldsConfig.httpFilterConfig, |
| } |
| |
| for i, rt := range su.virtualHost.Routes { |
| clusters := newWRR() |
| if rt.ClusterSpecifierPlugin != "" { |
| clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin |
| clusters.Add(&routeCluster{ |
| name: clusterName, |
| }, 1) |
| cs.initializeCluster(clusterName, xdsChildConfig{ |
| ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]), |
| }) |
| } else { |
| for cluster, wc := range rt.WeightedClusters { |
| clusterName := clusterPrefix + cluster |
| clusters.Add(&routeCluster{ |
| name: clusterName, |
| httpFilterConfigOverride: wc.HTTPFilterConfigOverride, |
| }, int64(wc.Weight)) |
| cs.initializeCluster(clusterName, xdsChildConfig{ |
| ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), |
| }) |
| } |
| } |
| cs.routes[i].clusters = clusters |
| |
| var err error |
| cs.routes[i].m, err = resource.RouteToMatcher(rt) |
| if err != nil { |
| return nil, err |
| } |
| if rt.MaxStreamDuration == nil { |
| cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration |
| } else { |
| cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration |
| } |
| |
| cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride |
| cs.routes[i].retryConfig = rt.RetryConfig |
| cs.routes[i].hashPolicies = rt.HashPolicies |
| } |
| |
| // Account for this config selector's clusters. Do this after no further |
| // errors may occur. Note: cs.clusters are pointers to entries in |
| // activeClusters. |
| for _, ci := range cs.clusters { |
| atomic.AddInt32(&ci.refCount, 1) |
| } |
| |
| return cs, nil |
| } |
| |
| // initializeCluster initializes entries in cs.clusters map, creating entries in |
| // r.activeClusters as necessary. Any created entries will have a ref count set |
| // to zero as their ref count will be incremented by incRefs. |
| func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) { |
| ci := cs.r.activeClusters[clusterName] |
| if ci == nil { |
| ci = &clusterInfo{refCount: 0} |
| cs.r.activeClusters[clusterName] = ci |
| } |
| cs.clusters[clusterName] = ci |
| cs.clusters[clusterName].cfg = cfg |
| } |
| |
| type clusterInfo struct { |
| // number of references to this cluster; accessed atomically |
| refCount int32 |
| // cfg is the child configuration for this cluster, containing either the |
| // csp config or the cds cluster config. |
| cfg xdsChildConfig |
| } |
| |
| type interceptorList struct { |
| interceptors []iresolver.ClientInterceptor |
| } |
| |
| func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { |
| for i := len(il.interceptors) - 1; i >= 0; i-- { |
| ns := newStream |
| interceptor := il.interceptors[i] |
| newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { |
| return interceptor.NewStream(ctx, ri, done, ns) |
| } |
| } |
| return newStream(ctx, func() {}) |
| } |