blob: 78db02657258869457c250fd06dc4e91471924b5 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// packages used for load balancer setting
package loadbalancer
import (
"math"
"sort"
)
import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
wrappers "google.golang.org/protobuf/types/known/wrapperspb"
"istio.io/api/networking/v1alpha3"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
)
func GetLocalityLbSetting(
mesh *v1alpha3.LocalityLoadBalancerSetting,
destrule *v1alpha3.LocalityLoadBalancerSetting,
) *v1alpha3.LocalityLoadBalancerSetting {
var enabled bool
// Locality lb is enabled if its not explicitly disabled in mesh global config
if mesh != nil && (mesh.Enabled == nil || mesh.Enabled.Value) {
enabled = true
}
// Unless we explicitly override this in destination rule
if destrule != nil {
if destrule.Enabled != nil && !destrule.Enabled.Value {
enabled = false
} else {
enabled = true
}
}
if !enabled {
return nil
}
// Destination Rule overrides mesh config. If its defined, use that
if destrule != nil {
return destrule
}
// Otherwise fall back to mesh default
return mesh
}
func ApplyLocalityLBSetting(
loadAssignment *endpoint.ClusterLoadAssignment,
wrappedLocalityLbEndpoints []*WrappedLocalityLbEndpoints,
locality *core.Locality,
proxyLabels map[string]string,
localityLB *v1alpha3.LocalityLoadBalancerSetting,
enableFailover bool,
) {
if localityLB == nil || loadAssignment == nil {
return
}
// one of Distribute or Failover settings can be applied.
if localityLB.GetDistribute() != nil {
applyLocalityWeight(locality, loadAssignment, localityLB.GetDistribute())
// Failover needs outlier detection, otherwise Envoy will never drop down to a lower priority.
// Do not apply default failover when locality LB is disabled.
} else if enableFailover && (localityLB.Enabled == nil || localityLB.Enabled.Value) {
if len(localityLB.FailoverPriority) > 0 {
applyPriorityFailover(loadAssignment, wrappedLocalityLbEndpoints, proxyLabels, localityLB.FailoverPriority)
return
}
applyLocalityFailover(locality, loadAssignment, localityLB.Failover)
}
}
// set locality loadbalancing weight
func applyLocalityWeight(
locality *core.Locality,
loadAssignment *endpoint.ClusterLoadAssignment,
distribute []*v1alpha3.LocalityLoadBalancerSetting_Distribute) {
if distribute == nil {
return
}
// Support Locality weighted load balancing
// (https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/locality_weight#locality-weighted-load-balancing)
// by providing weights in LocalityLbEndpoints via load_balancing_weight.
// By setting weights across different localities, it can allow
// Envoy to weight assignments across different zones and geographical locations.
for _, localityWeightSetting := range distribute {
if localityWeightSetting != nil &&
util.LocalityMatch(locality, localityWeightSetting.From) {
misMatched := map[int]struct{}{}
for i := range loadAssignment.Endpoints {
misMatched[i] = struct{}{}
}
for locality, weight := range localityWeightSetting.To {
// index -> original weight
destLocMap := map[int]uint32{}
totalWeight := uint32(0)
for i, ep := range loadAssignment.Endpoints {
if _, exist := misMatched[i]; exist {
if util.LocalityMatch(ep.Locality, locality) {
delete(misMatched, i)
if ep.LoadBalancingWeight != nil {
destLocMap[i] = ep.LoadBalancingWeight.Value
} else {
destLocMap[i] = 1
}
totalWeight += destLocMap[i]
}
}
}
// in case wildcard dest matching multi groups of endpoints
// the load balancing weight for a locality is divided by the sum of the weights of all localities
for index, originalWeight := range destLocMap {
destWeight := float64(originalWeight*weight) / float64(totalWeight)
if destWeight > 0 {
loadAssignment.Endpoints[index].LoadBalancingWeight = &wrappers.UInt32Value{
Value: uint32(math.Ceil(destWeight)),
}
}
}
}
// remove groups of endpoints in a locality that miss matched
for i := range misMatched {
loadAssignment.Endpoints[i].LbEndpoints = nil
}
break
}
}
}
// set locality loadbalancing priority
func applyLocalityFailover(
locality *core.Locality,
loadAssignment *endpoint.ClusterLoadAssignment,
failover []*v1alpha3.LocalityLoadBalancerSetting_Failover) {
// key is priority, value is the index of the LocalityLbEndpoints in ClusterLoadAssignment
priorityMap := map[int][]int{}
// 1. calculate the LocalityLbEndpoints.Priority compared with proxy locality
for i, localityEndpoint := range loadAssignment.Endpoints {
// if region/zone/subZone all match, the priority is 0.
// if region/zone match, the priority is 1.
// if region matches, the priority is 2.
// if locality not match, the priority is 3.
priority := util.LbPriority(locality, localityEndpoint.Locality)
// region not match, apply failover settings when specified
// update localityLbEndpoints' priority to 4 if failover not match
if priority == 3 {
for _, failoverSetting := range failover {
if failoverSetting.From == locality.Region {
if localityEndpoint.Locality == nil || localityEndpoint.Locality.Region != failoverSetting.To {
priority = 4
}
break
}
}
}
loadAssignment.Endpoints[i].Priority = uint32(priority)
priorityMap[priority] = append(priorityMap[priority], i)
}
// since Priorities should range from 0 (highest) to N (lowest) without skipping.
// 2. adjust the priorities in order
// 2.1 sort all priorities in increasing order.
priorities := []int{}
for priority := range priorityMap {
priorities = append(priorities, priority)
}
sort.Ints(priorities)
// 2.2 adjust LocalityLbEndpoints priority
// if the index and value of priorities array is not equal.
for i, priority := range priorities {
if i != priority {
// the LocalityLbEndpoints index in ClusterLoadAssignment.Endpoints
for _, index := range priorityMap[priority] {
loadAssignment.Endpoints[index].Priority = uint32(i)
}
}
}
}
// WrappedLocalityLbEndpoints contain an envoy LocalityLbEndpoints
// and the original IstioEndpoints used to generate it.
// It is used to do failover priority label match with proxy labels.
type WrappedLocalityLbEndpoints struct {
IstioEndpoints []*model.IstioEndpoint
LocalityLbEndpoints *endpoint.LocalityLbEndpoints
}
// set loadbalancing priority by failover priority label
func applyPriorityFailover(
loadAssignment *endpoint.ClusterLoadAssignment,
wrappedLocalityLbEndpoints []*WrappedLocalityLbEndpoints,
proxyLabels map[string]string,
failoverPriorities []string) {
if len(proxyLabels) == 0 || len(wrappedLocalityLbEndpoints) == 0 {
return
}
priorityMap := make(map[int][]int, len(failoverPriorities))
localityLbEndpoints := []*endpoint.LocalityLbEndpoints{}
for _, wrappedLbEndpoint := range wrappedLocalityLbEndpoints {
localityLbEndpointsPerLocality := applyPriorityFailoverPerLocality(proxyLabels, wrappedLbEndpoint, failoverPriorities)
localityLbEndpoints = append(localityLbEndpoints, localityLbEndpointsPerLocality...)
}
for i, ep := range localityLbEndpoints {
priorityMap[int(ep.Priority)] = append(priorityMap[int(ep.Priority)], i)
}
// since Priorities should range from 0 (highest) to N (lowest) without skipping.
// adjust the priorities in order
// 1. sort all priorities in increasing order.
priorities := []int{}
for priority := range priorityMap {
priorities = append(priorities, priority)
}
sort.Ints(priorities)
// 2. adjust LocalityLbEndpoints priority
// if the index and value of priorities array is not equal.
for i, priority := range priorities {
if i != priority {
// the LocalityLbEndpoints index in ClusterLoadAssignment.Endpoints
for _, index := range priorityMap[priority] {
localityLbEndpoints[index].Priority = uint32(i)
}
}
}
loadAssignment.Endpoints = localityLbEndpoints
}
// set loadbalancing priority by failover priority label.
// split one LocalityLbEndpoints to multiple LocalityLbEndpoints based on failover priorities.
func applyPriorityFailoverPerLocality(
proxyLabels map[string]string,
ep *WrappedLocalityLbEndpoints,
failoverPriorities []string) []*endpoint.LocalityLbEndpoints {
lowestPriority := len(failoverPriorities)
// key is priority, value is the index of LocalityLbEndpoints.LbEndpoints
priorityMap := map[int][]int{}
for i, istioEndpoint := range ep.IstioEndpoints {
var priority int
// failoverPriority labels match
for j, label := range failoverPriorities {
if proxyLabels[label] != istioEndpoint.Labels[label] {
priority = lowestPriority - j
break
}
}
priorityMap[priority] = append(priorityMap[priority], i)
}
// sort all priorities in increasing order.
priorities := []int{}
for priority := range priorityMap {
priorities = append(priorities, priority)
}
sort.Ints(priorities)
out := make([]*endpoint.LocalityLbEndpoints, len(priorityMap))
for i, priority := range priorities {
out[i] = util.CloneLocalityLbEndpoint(ep.LocalityLbEndpoints)
out[i].LbEndpoints = nil
out[i].Priority = uint32(priority)
var weight uint32
for _, index := range priorityMap[priority] {
out[i].LbEndpoints = append(out[i].LbEndpoints, ep.LocalityLbEndpoints.LbEndpoints[index])
weight += ep.LocalityLbEndpoints.LbEndpoints[index].GetLoadBalancingWeight().GetValue()
}
// reset weight
out[i].LoadBalancingWeight = &wrappers.UInt32Value{
Value: weight,
}
}
return out
}