blob: 5ead9f5b9a947d849ca78440e8dbf3a877eb6824 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"context"
"sync"
"sync/atomic"
)
type MemoryLimitController interface {
ReserveMemory(ctx context.Context, size int64) bool
TryReserveMemory(size int64) bool
ForceReserveMemory(size int64)
ReleaseMemory(size int64)
CurrentUsage() int64
CurrentUsagePercent() float64
IsMemoryLimited() bool
RegisterTrigger(trigger func())
}
type memoryLimitController struct {
limit int64
chCond *chCond
currentUsage int64
triggers []*thresholdTrigger
// valid range is (0, 1.0)
triggerThreshold float64
}
type thresholdTrigger struct {
triggerFunc func()
triggerRunning int32
}
func (t *thresholdTrigger) canTryRunning() bool {
return atomic.CompareAndSwapInt32(&t.triggerRunning, 0, 1)
}
func (t *thresholdTrigger) setRunning(isRunning bool) {
if isRunning {
atomic.StoreInt32(&t.triggerRunning, 1)
} else {
atomic.StoreInt32(&t.triggerRunning, 0)
}
}
// NewMemoryLimitController threshold valid range is (0, 1.0)
func NewMemoryLimitController(limit int64, threshold float64) MemoryLimitController {
mlc := &memoryLimitController{
limit: limit,
chCond: newCond(&sync.Mutex{}),
triggerThreshold: threshold,
}
return mlc
}
func (m *memoryLimitController) ReserveMemory(ctx context.Context, size int64) bool {
if !m.TryReserveMemory(size) {
m.chCond.L.Lock()
defer m.chCond.L.Unlock()
for !m.TryReserveMemory(size) {
if !m.chCond.waitWithContext(ctx) {
return false
}
}
}
return true
}
func (m *memoryLimitController) TryReserveMemory(size int64) bool {
for {
current := atomic.LoadInt64(&m.currentUsage)
newUsage := current + size
// This condition means we allowed one request to go over the limit.
if m.IsMemoryLimited() && current > m.limit {
return false
}
if atomic.CompareAndSwapInt64(&m.currentUsage, current, newUsage) {
m.checkTrigger(current, newUsage)
return true
}
}
}
func (m *memoryLimitController) ForceReserveMemory(size int64) {
nextUsage := atomic.AddInt64(&m.currentUsage, size)
prevUsage := nextUsage - size
m.checkTrigger(prevUsage, nextUsage)
}
func (m *memoryLimitController) ReleaseMemory(size int64) {
newUsage := atomic.AddInt64(&m.currentUsage, -size)
if newUsage+size > m.limit && newUsage <= m.limit {
m.chCond.broadcast()
}
}
func (m *memoryLimitController) CurrentUsage() int64 {
return atomic.LoadInt64(&m.currentUsage)
}
func (m *memoryLimitController) CurrentUsagePercent() float64 {
return float64(atomic.LoadInt64(&m.currentUsage)) / float64(m.limit)
}
func (m *memoryLimitController) IsMemoryLimited() bool {
return m.limit > 0
}
func (m *memoryLimitController) RegisterTrigger(trigger func()) {
m.chCond.L.Lock()
defer m.chCond.L.Unlock()
m.triggers = append(m.triggers, &thresholdTrigger{
triggerFunc: trigger,
})
}
func (m *memoryLimitController) checkTrigger(prevUsage int64, nextUsage int64) {
nextUsagePercent := float64(nextUsage) / float64(m.limit)
prevUsagePercent := float64(prevUsage) / float64(m.limit)
if nextUsagePercent >= m.triggerThreshold && prevUsagePercent < m.triggerThreshold {
for _, trigger := range m.triggers {
if trigger.canTryRunning() {
go func(trigger *thresholdTrigger) {
trigger.triggerFunc()
trigger.setRunning(false)
}(trigger)
}
}
}
}