blob: 6ae5c8500a29011ac967ee5fbf18f5c59fed1077 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2020 gRPC authors.
*
*/
// Package load provides functionality to record and maintain load data.
package load
import (
"sync"
"sync/atomic"
"time"
)
const negativeOneUInt64 = ^uint64(0)
// Store keeps the loads for multiple clusters and services to be reported via
// LRS. It contains loads to reported to one LRS server. Create multiple stores
// for multiple servers.
//
// It is safe for concurrent use.
type Store struct {
// mu only protects the map (2 layers). The read/write to *perClusterStore
// doesn't need to hold the mu.
mu sync.Mutex
// clusters is a map with cluster name as the key. The second layer is a map
// with service name as the key. Each value (perClusterStore) contains data
// for a (cluster, service) pair.
//
// Note that new entries are added to this map, but never removed. This is
// potentially a memory leak. But the memory is allocated for each new
// (cluster,service) pair, and the memory allocated is just pointers and
// maps. So this shouldn't get too bad.
clusters map[string]map[string]*perClusterStore
}
// NewStore creates a Store.
func NewStore() *Store {
return &Store{
clusters: make(map[string]map[string]*perClusterStore),
}
}
// Stats returns the load data for the given cluster names. Data is returned in
// a slice with no specific order.
//
// If no clusterName is given (an empty slice), all data for all known clusters
// is returned.
//
// If a cluster's Data is empty (no load to report), it's not appended to the
// returned slice.
func (s *Store) Stats(clusterNames []string) []*Data {
var ret []*Data
s.mu.Lock()
defer s.mu.Unlock()
if len(clusterNames) == 0 {
for _, c := range s.clusters {
ret = appendClusterStats(ret, c)
}
return ret
}
for _, n := range clusterNames {
if c, ok := s.clusters[n]; ok {
ret = appendClusterStats(ret, c)
}
}
return ret
}
// appendClusterStats gets Data for the given cluster, append to ret, and return
// the new slice.
//
// Data is only appended to ret if it's not empty.
func appendClusterStats(ret []*Data, cluster map[string]*perClusterStore) []*Data {
for _, d := range cluster {
data := d.stats()
if data == nil {
// Skip this data if it doesn't contain any information.
continue
}
ret = append(ret, data)
}
return ret
}
// PerCluster returns the perClusterStore for the given clusterName +
// serviceName.
func (s *Store) PerCluster(clusterName, serviceName string) PerClusterReporter {
if s == nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
c, ok := s.clusters[clusterName]
if !ok {
c = make(map[string]*perClusterStore)
s.clusters[clusterName] = c
}
if p, ok := c[serviceName]; ok {
return p
}
p := &perClusterStore{
cluster: clusterName,
service: serviceName,
}
c[serviceName] = p
return p
}
// perClusterStore is a repository for LB policy implementations to report store
// load data. It contains load for a (cluster, edsService) pair.
//
// It is safe for concurrent use.
//
// TODO(easwars): Use regular maps with mutexes instead of sync.Map here. The
// latter is optimized for two common use cases: (1) when the entry for a given
// key is only ever written once but read many times, as in caches that only
// grow, or (2) when multiple goroutines read, write, and overwrite entries for
// disjoint sets of keys. In these two cases, use of a Map may significantly
// reduce lock contention compared to a Go map paired with a separate Mutex or
// RWMutex.
// Neither of these conditions are met here, and we should transition to a
// regular map with a mutex for better type safety.
type perClusterStore struct {
cluster, service string
drops sync.Map // map[string]*uint64
localityRPCCount sync.Map // map[string]*rpcCountData
mu sync.Mutex
lastLoadReportAt time.Time
}
// Update functions are called by picker for each RPC. To avoid contention, all
// updates are done atomically.
// CallDropped adds one drop record with the given category to store.
func (ls *perClusterStore) CallDropped(category string) {
if ls == nil {
return
}
p, ok := ls.drops.Load(category)
if !ok {
tp := new(uint64)
p, _ = ls.drops.LoadOrStore(category, tp)
}
atomic.AddUint64(p.(*uint64), 1)
}
// CallStarted adds one call started record for the given locality.
func (ls *perClusterStore) CallStarted(locality string) {
if ls == nil {
return
}
p, ok := ls.localityRPCCount.Load(locality)
if !ok {
tp := newRPCCountData()
p, _ = ls.localityRPCCount.LoadOrStore(locality, tp)
}
p.(*rpcCountData).incrInProgress()
}
// CallFinished adds one call finished record for the given locality.
// For successful calls, err needs to be nil.
func (ls *perClusterStore) CallFinished(locality string, err error) {
if ls == nil {
return
}
p, ok := ls.localityRPCCount.Load(locality)
if !ok {
// The map is never cleared, only values in the map are reset. So the
// case where entry for call-finish is not found should never happen.
return
}
p.(*rpcCountData).decrInProgress()
if err == nil {
p.(*rpcCountData).incrSucceeded()
} else {
p.(*rpcCountData).incrErrored()
}
}
// CallServerLoad adds one server load record for the given locality. The
// load type is specified by desc, and its value by val.
func (ls *perClusterStore) CallServerLoad(locality, name string, d float64) {
if ls == nil {
return
}
p, ok := ls.localityRPCCount.Load(locality)
if !ok {
// The map is never cleared, only values in the map are reset. So the
// case where entry for callServerLoad is not found should never happen.
return
}
p.(*rpcCountData).addServerLoad(name, d)
}
// Data contains all load data reported to the Store since the most recent call
// to stats().
type Data struct {
// Cluster is the name of the cluster this data is for.
Cluster string
// Service is the name of the EDS service this data is for.
Service string
// TotalDrops is the total number of dropped requests.
TotalDrops uint64
// Drops is the number of dropped requests per category.
Drops map[string]uint64
// LocalityStats contains load reports per locality.
LocalityStats map[string]LocalityData
// ReportInternal is the duration since last time load was reported (stats()
// was called).
ReportInterval time.Duration
}
// LocalityData contains load data for a single locality.
type LocalityData struct {
// RequestStats contains counts of requests made to the locality.
RequestStats RequestData
// LoadStats contains server load data for requests made to the locality,
// indexed by the load type.
LoadStats map[string]ServerLoadData
}
// RequestData contains request counts.
type RequestData struct {
// Succeeded is the number of succeeded requests.
Succeeded uint64
// Errored is the number of requests which ran into errors.
Errored uint64
// InProgress is the number of requests in flight.
InProgress uint64
}
// ServerLoadData contains server load data.
type ServerLoadData struct {
// Count is the number of load reports.
Count uint64
// Sum is the total value of all load reports.
Sum float64
}
func newData(cluster, service string) *Data {
return &Data{
Cluster: cluster,
Service: service,
Drops: make(map[string]uint64),
LocalityStats: make(map[string]LocalityData),
}
}
// stats returns and resets all loads reported to the store, except inProgress
// rpc counts.
//
// It returns nil if the store doesn't contain any (new) data.
func (ls *perClusterStore) stats() *Data {
if ls == nil {
return nil
}
sd := newData(ls.cluster, ls.service)
ls.drops.Range(func(key, val interface{}) bool {
d := atomic.SwapUint64(val.(*uint64), 0)
if d == 0 {
return true
}
sd.TotalDrops += d
keyStr := key.(string)
if keyStr != "" {
// Skip drops without category. They are counted in total_drops, but
// not in per category. One example is drops by circuit breaking.
sd.Drops[keyStr] = d
}
return true
})
ls.localityRPCCount.Range(func(key, val interface{}) bool {
countData := val.(*rpcCountData)
succeeded := countData.loadAndClearSucceeded()
inProgress := countData.loadInProgress()
errored := countData.loadAndClearErrored()
if succeeded == 0 && inProgress == 0 && errored == 0 {
return true
}
ld := LocalityData{
RequestStats: RequestData{
Succeeded: succeeded,
Errored: errored,
InProgress: inProgress,
},
LoadStats: make(map[string]ServerLoadData),
}
countData.serverLoads.Range(func(key, val interface{}) bool {
sum, count := val.(*rpcLoadData).loadAndClear()
if count == 0 {
return true
}
ld.LoadStats[key.(string)] = ServerLoadData{
Count: count,
Sum: sum,
}
return true
})
sd.LocalityStats[key.(string)] = ld
return true
})
ls.mu.Lock()
sd.ReportInterval = time.Since(ls.lastLoadReportAt)
ls.lastLoadReportAt = time.Now()
ls.mu.Unlock()
if sd.TotalDrops == 0 && len(sd.Drops) == 0 && len(sd.LocalityStats) == 0 {
return nil
}
return sd
}
type rpcCountData struct {
// Only atomic accesses are allowed for the fields.
succeeded *uint64
errored *uint64
inProgress *uint64
// Map from load desc to load data (sum+count). Loading data from map is
// atomic, but updating data takes a lock, which could cause contention when
// multiple RPCs try to report loads for the same desc.
//
// To fix the contention, shard this map.
serverLoads sync.Map // map[string]*rpcLoadData
}
func newRPCCountData() *rpcCountData {
return &rpcCountData{
succeeded: new(uint64),
errored: new(uint64),
inProgress: new(uint64),
}
}
func (rcd *rpcCountData) incrSucceeded() {
atomic.AddUint64(rcd.succeeded, 1)
}
func (rcd *rpcCountData) loadAndClearSucceeded() uint64 {
return atomic.SwapUint64(rcd.succeeded, 0)
}
func (rcd *rpcCountData) incrErrored() {
atomic.AddUint64(rcd.errored, 1)
}
func (rcd *rpcCountData) loadAndClearErrored() uint64 {
return atomic.SwapUint64(rcd.errored, 0)
}
func (rcd *rpcCountData) incrInProgress() {
atomic.AddUint64(rcd.inProgress, 1)
}
func (rcd *rpcCountData) decrInProgress() {
atomic.AddUint64(rcd.inProgress, negativeOneUInt64) // atomic.Add(x, -1)
}
func (rcd *rpcCountData) loadInProgress() uint64 {
return atomic.LoadUint64(rcd.inProgress) // InProgress count is not clear when reading.
}
func (rcd *rpcCountData) addServerLoad(name string, d float64) {
loads, ok := rcd.serverLoads.Load(name)
if !ok {
tl := newRPCLoadData()
loads, _ = rcd.serverLoads.LoadOrStore(name, tl)
}
loads.(*rpcLoadData).add(d)
}
// Data for server loads (from trailers or oob). Fields in this struct must be
// updated consistently.
//
// The current solution is to hold a lock, which could cause contention. To fix,
// shard serverLoads map in rpcCountData.
type rpcLoadData struct {
mu sync.Mutex
sum float64
count uint64
}
func newRPCLoadData() *rpcLoadData {
return &rpcLoadData{}
}
func (rld *rpcLoadData) add(v float64) {
rld.mu.Lock()
rld.sum += v
rld.count++
rld.mu.Unlock()
}
func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) {
rld.mu.Lock()
s = rld.sum
rld.sum = 0
c = rld.count
rld.count = 0
rld.mu.Unlock()
return
}