blob: 3ce16ee0a3aee5d64c74beb5c5b0f2dc1aeaf35d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package limiter
import (
"math"
"sync"
"time"
)
import (
"go.uber.org/atomic"
)
var (
_ Limiter = (*HillClimbing)(nil)
_ Updater = (*HillClimbingUpdater)(nil)
)
type HillClimbingOption int64
const (
HillClimbingOptionShrinkPlus HillClimbingOption = -2
HillClimbingOptionShrink HillClimbingOption = -1
HillClimbingOptionDoNothing HillClimbingOption = 0
HillClimbingOptionExtend HillClimbingOption = 1
HillClimbingOptionExtendPlus HillClimbingOption = 2
)
var (
initialLimitation uint64 = 50
maxLimitation uint64 = 500
radicalPeriod = 1000 * time.Millisecond
stablePeriod = 32000 * time.Millisecond
)
// HillClimbing is a limiter using HillClimbing algorithm
type HillClimbing struct {
seq *atomic.Uint64
round *atomic.Uint64
inflight *atomic.Uint64
limitation *atomic.Uint64
mutex *sync.Mutex
// nextUpdateTime = lastUpdatedTime + updateInterval
updateInterval *atomic.Duration
lastUpdatedTime *atomic.Time
// metrics of the current round
transactionNum *atomic.Uint64
rttAvg *atomic.Float64
// best metrics in the history
bestMaxCapacity *atomic.Float64
bestRTTAvg *atomic.Float64
bestLimitation *atomic.Uint64
bestTPS *atomic.Uint64
}
func NewHillClimbing() Limiter {
l := &HillClimbing{
seq: new(atomic.Uint64),
round: new(atomic.Uint64),
inflight: new(atomic.Uint64),
limitation: atomic.NewUint64(initialLimitation),
mutex: new(sync.Mutex),
updateInterval: atomic.NewDuration(radicalPeriod),
lastUpdatedTime: atomic.NewTime(time.Now()),
transactionNum: new(atomic.Uint64),
rttAvg: new(atomic.Float64),
bestMaxCapacity: new(atomic.Float64),
bestRTTAvg: atomic.NewFloat64(math.MaxFloat64),
bestLimitation: new(atomic.Uint64),
bestTPS: new(atomic.Uint64),
}
return l
}
func (l *HillClimbing) Inflight() uint64 {
return l.inflight.Load()
}
func (l *HillClimbing) Remaining() uint64 {
limitation := l.limitation.Load()
inflight := l.Inflight()
if limitation < inflight {
return 0
}
return limitation - inflight
}
func (l *HillClimbing) Acquire() (Updater, error) {
if l.Remaining() == 0 {
return nil, ErrReachLimitation
}
return NewHillClimbingUpdater(l), nil
}
type HillClimbingUpdater struct {
startTime time.Time
limiter *HillClimbing
// for debug purposes
seq uint64
}
func NewHillClimbingUpdater(limiter *HillClimbing) *HillClimbingUpdater {
inflight := limiter.inflight.Add(1)
u := &HillClimbingUpdater{
startTime: time.Now(),
limiter: limiter,
seq: limiter.seq.Add(1) - 1,
}
VerboseDebugf("[NewHillClimbingUpdater] A new request arrived, seq: %d, inflight: %d, time: %s.",
u.seq, inflight, u.startTime)
return u
}
func (u *HillClimbingUpdater) DoUpdate() error {
defer func() {
u.limiter.inflight.Dec()
}()
VerboseDebugf("[HillClimbingUpdater] A request finished, the limiter will be updated, seq: %d.", u.seq)
rtt := uint64(time.Now().Sub(u.startTime).Milliseconds())
inflight := u.limiter.Inflight()
option, err := u.getOption(rtt, inflight)
if err != nil {
return err
}
if err = u.adjustLimitation(option); err != nil {
return err
}
return nil
}
func (u *HillClimbingUpdater) getOption(rtt, _ uint64) (HillClimbingOption, error) {
u.limiter.mutex.Lock()
defer u.limiter.mutex.Unlock()
now := time.Now()
option := HillClimbingOptionDoNothing
lastUpdatedTime := u.limiter.lastUpdatedTime.Load()
updateInterval := u.limiter.updateInterval.Load()
rttAvg := u.limiter.rttAvg.Load()
transactionNum := u.limiter.transactionNum.Load()
limitation := u.limiter.limitation.Load()
// the current option is expired
if now.Before(lastUpdatedTime) {
return option, nil
}
if now.Sub(lastUpdatedTime) > updateInterval || rttAvg == 0 {
// the current req is on the next round or no rttAvg.
// FIXME(justxuewei): If all requests in one round not receive responses, rttAvg will be 0, and maxCapacity will
// be 0 as well, the actual maxCapacity, however, is not 0.
maxCapacity := float64(transactionNum) * float64(updateInterval.Milliseconds()) / rttAvg
VerboseDebugf("[HillClimbingUpdater] maxCapacity: %f, transactionNum: %d, rttAvg: %f, bestRTTAvg: %f, "+
"updateInterval: %d",
maxCapacity, transactionNum, rttAvg, u.limiter.bestRTTAvg.Load(), updateInterval.Milliseconds())
// Consider extending limitation if concurrent is about to reach the limitation.
if u.limiter.bestRTTAvg.Load() == math.MaxFloat64 || uint64(maxCapacity*1.5) > limitation {
if updateInterval == radicalPeriod {
option = HillClimbingOptionExtendPlus
} else {
option = HillClimbingOptionExtend
}
}
tps := uint64(1000.0 * float64(transactionNum) / float64(updateInterval.Milliseconds()))
VerboseDebugf("[HillClimbingUpdater] The TPS is %d, transactionNum: %d, updateInterval: %d.",
tps, transactionNum, updateInterval)
if tps > u.limiter.bestTPS.Load() {
VerboseDebugf("[HillClimbingUpdater] The best TPS is updated from %d to %d.",
u.limiter.bestTPS.Load(), tps)
// tps is the best in the history, update
// all best metrics.
u.limiter.bestTPS.Store(tps)
u.limiter.bestRTTAvg.Store(rttAvg)
u.limiter.bestMaxCapacity.Store(maxCapacity)
u.limiter.bestLimitation.Store(u.limiter.limitation.Load())
VerboseDebugf("[HillClimbingUpdater] Best-metrics are up-to-date, "+
"seq: %d, bestTPS: %d, bestRTTAvg: %.4f, bestMaxCapacity: %d,"+
" bestLimitation: %d.", u.seq, u.limiter.bestTPS.Load(),
u.limiter.bestRTTAvg.Load(), u.limiter.bestMaxCapacity.Load(),
u.limiter.bestLimitation.Load())
} else {
VerboseDebugf("[HillClimbingUpdater] The best TPS is not updated, best TPS is %d, "+
"the current TPS is %d",
u.limiter.bestTPS.Load(), tps)
if u.shouldShrink(transactionNum, maxCapacity, tps, rttAvg) {
if updateInterval == radicalPeriod {
option = HillClimbingOptionShrinkPlus
} else {
option = HillClimbingOptionShrink
}
// shrinking limitation means the process of adjusting
// limitation goes to stable, so extends the update
// interval to avoid adjusting frequently.
u.limiter.updateInterval.Store(minDuration(updateInterval*2, stablePeriod))
}
}
// reset metrics for the new round
u.limiter.transactionNum.Store(0)
u.limiter.rttAvg.Store(float64(rtt))
u.limiter.lastUpdatedTime.Store(time.Now())
VerboseDebugf("[HillClimbingUpdater] A new round is applied, all metrics are reset.")
} else {
// still on the current round
u.limiter.transactionNum.Add(1)
// ra = (ra * c + r) / (c + 1), where ra denotes rttAvg,
// c denotes transactionNum, r denotes rtt.
u.limiter.rttAvg.Store((rttAvg*float64(transactionNum) + float64(rtt)) / float64(transactionNum+1))
option = HillClimbingOptionDoNothing
}
return option, nil
}
func (u *HillClimbingUpdater) shouldShrink(transactionNum uint64, maxCapacity float64, tps uint64, rttAvg float64) bool {
//bestTPS := u.limiter.bestTPS.Load()
bestMaxCapacity := u.limiter.bestMaxCapacity.Load()
bestRTTAvg := u.limiter.bestRTTAvg.Load()
diff := bestMaxCapacity - maxCapacity
diffPct := uint64(100.0 * diff / bestMaxCapacity)
VerboseDebugf("[HillClimbingUpdater] shouldShrink maxCapacity diff: %f, diffPct: %d.", diff, diffPct)
if diff <= 300 && diffPct <= 10 {
// diff is acceptable, shouldn't shrink
return false
}
if diff > 0 || rttAvg > bestRTTAvg {
// The unacceptable diff dues to too large maxCapacity or rttAvg.
rttAvgDiff := uint64(rttAvg - bestRTTAvg)
rttAvgDiffPct := uint64(100.0 * rttAvg / bestRTTAvg)
// TODO(justxuewei): Hard-coding here is not proper, but it should refactor after testing.
var (
rttAvgDiffThreshold uint64
rttAvgDiffPctThreshold uint64
)
if bestRTTAvg < 5 {
rttAvgDiffThreshold = 3
rttAvgDiffPctThreshold = 80
} else if bestRTTAvg < 10 {
rttAvgDiffThreshold = 2
rttAvgDiffPctThreshold = 30
} else if bestRTTAvg < 50 {
rttAvgDiffThreshold = 5
rttAvgDiffPctThreshold = 20
} else if bestRTTAvg < 100 {
rttAvgDiffThreshold = 10
rttAvgDiffPctThreshold = 10
} else {
rttAvgDiffThreshold = 20
rttAvgDiffPctThreshold = 5
}
VerboseDebugf("[HillClimbingUpdater] shouldShrink bestRTTAvg: %d, rttAvgDiff: %d, rttAvgDiffPct: %d, "+
"rttAvgDiffThreshold: %d, rttAvgDiffPctThreshold: %d.", bestRTTAvg, rttAvgDiff, rttAvgDiffPct,
rttAvgDiffPctThreshold, rttAvgDiffPctThreshold)
return (diffPct > 10 && diff > 5) &&
(rttAvgDiff > rttAvgDiffThreshold || rttAvgDiffPct >= rttAvgDiffPctThreshold)
}
return false
}
func (u *HillClimbingUpdater) adjustLimitation(option HillClimbingOption) error {
if option == HillClimbingOptionDoNothing {
VerboseDebugf("[HillClimbingUpdater] The option is do nothing, the limitation will not be updated.")
return nil
}
limitation := float64(u.limiter.limitation.Load())
oldLimitation := limitation
bestLimitation := float64(u.limiter.bestLimitation.Load())
updateInterval := u.limiter.updateInterval.Load()
alpha := 1.5 * math.Log(limitation)
beta := 0.8 * math.Log(limitation)
logUpdateInterval := math.Max(1.0, math.Log2(float64(updateInterval.Milliseconds())/1000.0))
VerboseDebugf("[HillClimbingUpdater] Before calculating new limitation, option: %d, limitation: %f, "+
"bestLimitation: %f, alpha: %f, beta: %f, logUpdateInterval: %f, updateInterval: %d", option, limitation,
bestLimitation, alpha, beta, logUpdateInterval, updateInterval.Milliseconds())
switch option {
case HillClimbingOptionExtendPlus:
limitation += alpha / logUpdateInterval
case HillClimbingOptionExtend:
limitation += beta / logUpdateInterval
case HillClimbingOptionShrinkPlus:
limitation = bestLimitation - alpha/logUpdateInterval
case HillClimbingOptionShrink:
limitation = bestLimitation - beta/logUpdateInterval
}
limitation = math.Max(1.0, math.Min(limitation, float64(maxLimitation)))
u.limiter.limitation.Store(uint64(limitation))
VerboseDebugf("[HillClimbingUpdater] The limitation is update from %d to %d.", uint64(oldLimitation), uint64(limitation))
return nil
}
func (u *HillClimbingUpdater) shouldDrop(lastUpdatedTime time.Time) (isDropped bool) {
if !u.limiter.lastUpdatedTime.Load().Equal(lastUpdatedTime) {
VerboseDebugf("[HillClimbingUpdater] The limitation is updated by others, drop this update, seq: %d.", u.seq)
isDropped = true
return
}
return
}