| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * |
| * Copyright 2021 gRPC authors. |
| * |
| */ |
| |
| package clusterresolver |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "sort" |
| ) |
| |
| import ( |
| dubbogoLogger "github.com/dubbogo/gost/log/logger" |
| |
| "google.golang.org/grpc/balancer/roundrobin" |
| "google.golang.org/grpc/balancer/weightedroundrobin" |
| "google.golang.org/grpc/balancer/weightedtarget" |
| |
| "google.golang.org/grpc/resolver" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/xds/balancer/clusterimpl" |
| "dubbo.apache.org/dubbo-go/v3/xds/balancer/priority" |
| "dubbo.apache.org/dubbo-go/v3/xds/balancer/ringhash" |
| "dubbo.apache.org/dubbo-go/v3/xds/client/resource" |
| "dubbo.apache.org/dubbo-go/v3/xds/utils/hierarchy" |
| internalserviceconfig "dubbo.apache.org/dubbo-go/v3/xds/utils/serviceconfig" |
| ) |
| |
| const million = 1000000 |
| |
| // priorityConfig is config for one priority. For example, if there an EDS and a |
| // DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}]. |
| // |
| // Each priorityConfig corresponds to one discovery mechanism from the LBConfig |
| // generated by the CDS balancer. The CDS balancer resolves the cluster name to |
| // an ordered list of discovery mechanisms (if the top cluster is an aggregated |
| // cluster), one for each underlying cluster. |
| type priorityConfig struct { |
| mechanism DiscoveryMechanism |
| // edsResp is set only if type is EDS. |
| edsResp resource.EndpointsUpdate |
| // addresses is set only if type is DNS. |
| addresses []string |
| } |
| |
| // buildPriorityConfigJSON builds balancer config for the passed in |
| // priorities. |
| // |
| // The built tree of balancers (see test for the output struct). |
| // |
| // If xds lb policy is ROUND_ROBIN, the children will be weighted_target for |
| // locality picking, and round_robin for endpoint picking. |
| // |
| // ┌────────┐ |
| // │priority│ |
| // └┬──────┬┘ |
| // │ │ |
| // ┌───────────▼┐ ┌▼───────────┐ |
| // │cluster_impl│ │cluster_impl│ |
| // └─┬──────────┘ └──────────┬─┘ |
| // │ │ |
| // ┌──────────────▼─┐ ┌─▼──────────────┐ |
| // │locality_picking│ │locality_picking│ |
| // └┬──────────────┬┘ └┬──────────────┬┘ |
| // │ │ │ │ |
| // ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ |
| // │LRS│ │LRS│ │LRS│ │LRS│ |
| // └─┬─┘ └─┬─┘ └─┬─┘ └─┬─┘ |
| // │ │ │ │ |
| // |
| // ┌──────────▼─────┐ ┌─────▼──────────┐ ┌──────────▼─────┐ ┌─────▼──────────┐ |
| // │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ |
| // └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘ |
| // |
| // If xds lb policy is RING_HASH, the children will be just a ring_hash policy. |
| // The endpoints from all localities will be flattened to one addresses list, |
| // and the ring_hash policy will pick endpoints from it. |
| // |
| // ┌────────┐ |
| // │priority│ |
| // └┬──────┬┘ |
| // │ │ |
| // |
| // ┌──────────▼─┐ ┌─▼──────────┐ |
| // │cluster_impl│ │cluster_impl│ |
| // └──────┬─────┘ └─────┬──────┘ |
| // |
| // │ │ |
| // |
| // ┌──────▼─────┐ ┌─────▼──────┐ |
| // │ ring_hash │ │ ring_hash │ |
| // └────────────┘ └────────────┘ |
| // |
| // If endpointPickingPolicy is nil, roundrobin will be used. |
| // |
| // Custom locality picking policy isn't support, and weighted_target is always |
| // used. |
| func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) { |
| pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy) |
| if err != nil { |
| return nil, nil, fmt.Errorf("failed to build priority config: %v", err) |
| } |
| ret, err := json.Marshal(pc) |
| if err != nil { |
| return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err) |
| } |
| return ret, addrs, nil |
| } |
| |
| func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) { |
| var ( |
| retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} |
| retAddrs []resolver.Address |
| ) |
| for i, p := range priorities { |
| switch p.mechanism.Type { |
| case DiscoveryMechanismTypeEDS: |
| names, configs, addrs, err := buildClusterImplConfigForEDS(i, p.edsResp, p.mechanism, xdsLBPolicy) |
| if err != nil { |
| return nil, nil, err |
| } |
| retConfig.Priorities = append(retConfig.Priorities, names...) |
| for n, c := range configs { |
| retConfig.Children[n] = &priority.Child{ |
| Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c}, |
| // Ignore all re-resolution from EDS children. |
| IgnoreReresolutionRequests: true, |
| } |
| } |
| retAddrs = append(retAddrs, addrs...) |
| case DiscoveryMechanismTypeLogicalDNS: |
| name, config, addrs := buildClusterImplConfigForDNS(i, p.addresses) |
| retConfig.Priorities = append(retConfig.Priorities, name) |
| retConfig.Children[name] = &priority.Child{ |
| Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config}, |
| // Not ignore re-resolution from DNS children, they will trigger |
| // DNS to re-resolve. |
| IgnoreReresolutionRequests: false, |
| } |
| retAddrs = append(retAddrs, addrs...) |
| } |
| } |
| return retConfig, retAddrs, nil |
| } |
| |
| func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string) (string, *clusterimpl.LBConfig, []resolver.Address) { |
| // Endpoint picking policy for DNS is hardcoded to pick_first. |
| const childPolicy = "pick_first" |
| retAddrs := make([]resolver.Address, 0, len(addrStrs)) |
| pName := fmt.Sprintf("priority-%v", parentPriority) |
| for _, addrStr := range addrStrs { |
| retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName})) |
| } |
| return pName, &clusterimpl.LBConfig{ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}}, retAddrs |
| } |
| |
| // buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for |
| // each priority, sorted by priority, and the addresses for each priority (with |
| // hierarchy attributes set). |
| // |
| // For example, if there are two priorities, the returned values will be |
| // - ["p0", "p1"] |
| // - map{"p0":p0_config, "p1":p1_config} |
| // - [p0_address_0, p0_address_1, p1_address_0, p1_address_1] |
| // - p0 addresses' hierarchy attributes are set to p0 |
| func buildClusterImplConfigForEDS(parentPriority int, edsResp resource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) { |
| drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops)) |
| for _, d := range edsResp.Drops { |
| drops = append(drops, clusterimpl.DropConfig{ |
| Category: d.Category, |
| RequestsPerMillion: d.Numerator * million / d.Denominator, |
| }) |
| } |
| |
| priorityChildNames, priorities := groupLocalitiesByPriority(edsResp.Localities) |
| retNames := make([]string, 0, len(priorityChildNames)) |
| retAddrs := make([]resolver.Address, 0, len(priorityChildNames)) |
| retConfigs := make(map[string]*clusterimpl.LBConfig, len(priorityChildNames)) |
| for _, priorityName := range priorityChildNames { |
| priorityLocalities := priorities[priorityName] |
| // Prepend parent priority to the priority names, to avoid duplicates. |
| pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName) |
| retNames = append(retNames, pName) |
| cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy) |
| if err != nil { |
| return nil, nil, nil, err |
| } |
| retConfigs[pName] = cfg |
| retAddrs = append(retAddrs, addrs...) |
| } |
| return retNames, retConfigs, retAddrs, nil |
| } |
| |
| // groupLocalitiesByPriority returns the localities grouped by priority. |
| // |
| // It also returns a list of strings where each string represents a priority, |
| // and the list is sorted from higher priority to lower priority. |
| // |
| // For example, for L0-p0, L1-p0, L2-p1, results will be |
| // - ["p0", "p1"] |
| // - map{"p0":[L0, L1], "p1":[L2]} |
| func groupLocalitiesByPriority(localities []resource.Locality) ([]string, map[string][]resource.Locality) { |
| var priorityIntSlice []int |
| priorities := make(map[string][]resource.Locality) |
| for _, locality := range localities { |
| if locality.Weight == 0 { |
| continue |
| } |
| priorityName := fmt.Sprintf("%v", locality.Priority) |
| priorities[priorityName] = append(priorities[priorityName], locality) |
| priorityIntSlice = append(priorityIntSlice, int(locality.Priority)) |
| } |
| // Sort the priorities based on the int value, deduplicate, and then turn |
| // the sorted list into a string list. This will be child names, in priority |
| // order. |
| sort.Ints(priorityIntSlice) |
| priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice) |
| priorityNameSlice := make([]string, 0, len(priorityIntSliceDeduped)) |
| for _, p := range priorityIntSliceDeduped { |
| priorityNameSlice = append(priorityNameSlice, fmt.Sprintf("%v", p)) |
| } |
| return priorityNameSlice, priorities |
| } |
| |
| func dedupSortedIntSlice(a []int) []int { |
| if len(a) == 0 { |
| return a |
| } |
| i, j := 0, 1 |
| for ; j < len(a); j++ { |
| if a[i] == a[j] { |
| continue |
| } |
| i++ |
| if i != j { |
| a[i] = a[j] |
| } |
| } |
| return a[:i+1] |
| } |
| |
| // rrBalancerConfig is a const roundrobin config, used as child of |
| // weighted-roundrobin. To avoid allocating memory everytime. |
| var rrBalancerConfig = &internalserviceconfig.BalancerConfig{Name: roundrobin.Name} |
| |
| // priorityLocalitiesToClusterImpl takes a list of localities (with the same |
| // priority), and generates a cluster impl policy config, and a list of |
| // addresses. |
| func priorityLocalitiesToClusterImpl(localities []resource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) { |
| clusterImplCfg := &clusterimpl.LBConfig{ |
| Cluster: mechanism.Cluster, |
| EDSServiceName: mechanism.EDSServiceName, |
| LoadReportingServerName: mechanism.LoadReportingServerName, |
| MaxConcurrentRequests: mechanism.MaxConcurrentRequests, |
| DropCategories: drops, |
| // ChildPolicy is not set. Will be set based on xdsLBPolicy |
| } |
| |
| if xdsLBPolicy == nil || xdsLBPolicy.Name == rrName { |
| // If lb policy is ROUND_ROBIN: |
| // - locality-picking policy is weighted_target |
| // - endpoint-picking policy is round_robin |
| dubbogoLogger.Infof("xds lb policy is %q, building config with weighted_target + round_robin", rrName) |
| // Child of weighted_target is hardcoded to round_robin. |
| wtConfig, addrs := localitiesToWeightedTarget(localities, priorityName, rrBalancerConfig) |
| clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig} |
| return clusterImplCfg, addrs, nil |
| } |
| |
| if xdsLBPolicy.Name == rhName { |
| // If lb policy is RIHG_HASH, will build one ring_hash policy as child. |
| // The endpoints from all localities will be flattened to one addresses |
| // list, and the ring_hash policy will pick endpoints from it. |
| dubbogoLogger.Infof("xds lb policy is %q, building config with ring_hash", rhName) |
| addrs := localitiesToRingHash(localities, priorityName) |
| // Set child to ring_hash, note that the ring_hash config is from |
| // xdsLBPolicy. |
| clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: ringhash.Name, Config: xdsLBPolicy.Config} |
| return clusterImplCfg, addrs, nil |
| } |
| |
| return nil, nil, fmt.Errorf("unsupported xds LB policy %q, not one of {%q,%q}", xdsLBPolicy.Name, rrName, rhName) |
| } |
| |
| // localitiesToRingHash takes a list of localities (with the same priority), and |
| // generates a list of addresses. |
| // |
| // The addresses have path hierarchy set to [priority-name], so priority knows |
| // which child policy they are for. |
| func localitiesToRingHash(localities []resource.Locality, priorityName string) []resolver.Address { |
| var addrs []resolver.Address |
| for _, locality := range localities { |
| var lw uint32 = 1 |
| if locality.Weight != 0 { |
| lw = locality.Weight |
| } |
| localityStr, err := locality.ID.ToString() |
| if err != nil { |
| localityStr = fmt.Sprintf("%+v", locality.ID) |
| } |
| for _, endpoint := range locality.Endpoints { |
| // Filter out all "unhealthy" endpoints (unknown and healthy are |
| // both considered to be healthy: |
| // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). |
| if endpoint.HealthStatus != resource.EndpointHealthStatusHealthy && endpoint.HealthStatus != resource.EndpointHealthStatusUnknown { |
| continue |
| } |
| |
| var ew uint32 = 1 |
| if endpoint.Weight != 0 { |
| ew = endpoint.Weight |
| } |
| |
| // The weight of each endpoint is locality_weight * endpoint_weight. |
| ai := weightedroundrobin.AddrInfo{Weight: lw * ew} |
| addr := weightedroundrobin.SetAddrInfo(resolver.Address{Addr: endpoint.Address}, ai) |
| addr = hierarchy.Set(addr, []string{priorityName, localityStr}) |
| addr = resource.SetLocalityID(addr, locality.ID) |
| addrs = append(addrs, addr) |
| } |
| } |
| return addrs |
| } |
| |
| // localitiesToWeightedTarget takes a list of localities (with the same |
| // priority), and generates a weighted target config, and list of addresses. |
| // |
| // The addresses have path hierarchy set to [priority-name, locality-name], so |
| // priority and weighted target know which child policy they are for. |
| func localitiesToWeightedTarget(localities []resource.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig) (*TargetLBConfig, []resolver.Address) { |
| weightedTargets := make(map[string]Target) |
| var addrs []resolver.Address |
| for _, locality := range localities { |
| localityStr, err := locality.ID.ToString() |
| if err != nil { |
| localityStr = fmt.Sprintf("%+v", locality.ID) |
| } |
| weightedTargets[localityStr] = Target{Weight: locality.Weight, ChildPolicy: childPolicy} |
| for _, endpoint := range locality.Endpoints { |
| // Filter out all "unhealthy" endpoints (unknown and healthy are |
| // both considered to be healthy: |
| // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). |
| if endpoint.HealthStatus != resource.EndpointHealthStatusHealthy && endpoint.HealthStatus != resource.EndpointHealthStatusUnknown { |
| continue |
| } |
| |
| addr := resolver.Address{Addr: endpoint.Address} |
| if childPolicy.Name == weightedroundrobin.Name && endpoint.Weight != 0 { |
| ai := weightedroundrobin.AddrInfo{Weight: endpoint.Weight} |
| addr = weightedroundrobin.SetAddrInfo(addr, ai) |
| } |
| addr = hierarchy.Set(addr, []string{priorityName, localityStr}) |
| addr = resource.SetLocalityID(addr, locality.ID) |
| addrs = append(addrs, addr) |
| } |
| } |
| return &TargetLBConfig{Targets: weightedTargets}, addrs |
| } |