blob: e0b756666c3e6352d02ca43a6ff2af75d20e31d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ringhash
import (
"fmt"
"math"
"math/bits"
"sort"
"strconv"
)
import (
"github.com/cespare/xxhash/v2"
)
import (
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
"dubbo.apache.org/dubbo-go/v3/xds/utils/grpcrand"
)
type invokerWithWeight struct {
invoker protocol.Invoker
weight float64
}
type ringEntry struct {
idx int
hash uint64
invoker protocol.Invoker
}
func (lb *ringhashLoadBalance) generateRing(invokers []invokerWrapper, minRingSize, maxRingSize uint64) ([]*ringEntry, error) {
normalizedWeights, minWeight, err := normalizeWeights(invokers)
if err != nil {
return nil, err
}
// Normalized weights for {3,3,4} is {0.3,0.3,0.4}.
// Scale up the size of the ring such that the least-weighted host gets a
// whole number of hashes on the ring.
//
// Note that size is limited by the input max/min.
scale := math.Min(math.Ceil(minWeight*float64(minRingSize))/minWeight, float64(maxRingSize))
ringSize := math.Ceil(scale)
items := make([]*ringEntry, 0, int(ringSize))
// For each entry, scale*weight nodes are generated in the ring.
//
// Not all of these are whole numbers. E.g. for weights {a:3,b:3,c:4}, if
// ring size is 7, scale is 6.66. The numbers of nodes will be
// {a,a,b,b,c,c,c}.
//
// A hash is generated for each item, and later the results will be sorted
// based on the hash.
var (
idx int
targetIdx float64
)
for _, inw := range normalizedWeights {
targetIdx += scale * inw.weight
for float64(idx) < targetIdx {
h := xxhash.Sum64String(inw.invoker.GetURL().String() + strconv.Itoa(len(items)))
items = append(items, &ringEntry{idx: idx, hash: h, invoker: inw.invoker})
idx++
}
}
// Sort items based on hash, to prepare for binary search.
sort.Slice(items, func(i, j int) bool { return items[i].hash < items[j].hash })
for i, ii := range items {
ii.idx = i
}
return items, nil
}
// normalizeWeights divides all the weights by the sum, so that the total weight
// is 1.
func normalizeWeights(invokers []invokerWrapper) ([]invokerWithWeight, float64, error) {
var weightSum int
for _, v := range invokers {
weightSum += v.weight
}
if weightSum == 0 {
return nil, 0, fmt.Errorf("total weight of all endpoints is 0")
}
weightSumF := float64(weightSum)
ret := make([]invokerWithWeight, 0, len(invokers))
min := math.MaxFloat64
for _, invoker := range invokers {
nw := float64(invoker.weight) / weightSumF
ret = append(ret, invokerWithWeight{invoker: invoker.invoker, weight: nw})
if nw < min {
min = nw
}
}
return ret, min, nil
}
func (lb *ringhashLoadBalance) pick(h uint64, items []*ringEntry) *ringEntry {
i := sort.Search(len(items), func(i int) bool { return items[i].hash >= h })
if i == len(items) {
// If not found, and h is greater than the largest hash, return the
// first item.
i = 0
}
return items[i]
}
func (lb *ringhashLoadBalance) generateHash(invocation protocol.Invocation, hashPolicies []*resource.HashPolicy) uint64 {
var (
hash uint64
generatedHash bool
)
for _, policy := range hashPolicies {
var (
policyHash uint64
generatedPolicyHash bool
)
switch policy.HashPolicyType {
case resource.HashPolicyTypeHeader:
value, ok := invocation.GetAttachment(policy.HeaderName)
if !ok || len(value) == 0 {
continue
}
policyHash = xxhash.Sum64String(value)
generatedHash = true
generatedPolicyHash = true
case resource.HashPolicyTypeChannelID:
// Hash the ClientConn pointer which logically uniquely
// identifies the client.
policyHash = xxhash.Sum64String(fmt.Sprintf("%p", &lb.client))
generatedHash = true
generatedPolicyHash = true
}
// Deterministically combine the hash policies. Rotating prevents
// duplicate hash policies from canceling each other out and preserves
// the 64 bits of entropy.
if generatedPolicyHash {
hash = bits.RotateLeft64(hash, 1)
hash = hash ^ policyHash
}
// If terminal policy and a hash has already been generated, ignore the
// rest of the policies and use that hash already generated.
if policy.Terminal && generatedHash {
break
}
}
if generatedHash {
return hash
}
// If no generated hash return a random long. In the grand scheme of things
// this logically will map to choosing a random backend to route request to.
return grpcrand.Uint64()
}