blob: 81623eca108ee5e2a5234fe383efd97e02dc5a99 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestLimit(t *testing.T) {
mlc := NewMemoryLimitController(100, 1.0)
for i := 0; i < 101; i++ {
assert.True(t, mlc.TryReserveMemory(1))
}
assert.False(t, mlc.TryReserveMemory(1))
assert.Equal(t, int64(101), mlc.CurrentUsage())
assert.InDelta(t, 1.01, mlc.CurrentUsagePercent(), 0.000001)
mlc.ReleaseMemory(1)
assert.Equal(t, int64(100), mlc.CurrentUsage())
assert.InDelta(t, 1.0, mlc.CurrentUsagePercent(), 0.000001)
assert.True(t, mlc.TryReserveMemory(1))
assert.Equal(t, int64(101), mlc.CurrentUsage())
mlc.ForceReserveMemory(99)
assert.False(t, mlc.TryReserveMemory(1))
assert.Equal(t, int64(200), mlc.CurrentUsage())
assert.InDelta(t, 2.0, mlc.CurrentUsagePercent(), 0.000001)
mlc.ReleaseMemory(50)
assert.False(t, mlc.TryReserveMemory(1))
assert.Equal(t, int64(150), mlc.CurrentUsage())
assert.InDelta(t, 1.5, mlc.CurrentUsagePercent(), 0.000001)
}
func TestDisableLimit(t *testing.T) {
mlc := NewMemoryLimitController(-1, 1.0)
assert.True(t, mlc.TryReserveMemory(1000000))
assert.True(t, mlc.ReserveMemory(context.Background(), 1000000))
mlc.ReleaseMemory(1000000)
assert.Equal(t, int64(1000000), mlc.CurrentUsage())
}
func TestMultiGoroutineTryReserveMem(t *testing.T) {
mlc := NewMemoryLimitController(10000, 1.0)
// Multi goroutine try reserve memory.
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
for i := 0; i < 1000; i++ {
assert.True(t, mlc.TryReserveMemory(1))
}
wg.Done()
}()
}
assert.True(t, mlc.TryReserveMemory(1))
wg.Wait()
assert.False(t, mlc.TryReserveMemory(1))
assert.Equal(t, int64(10001), mlc.CurrentUsage())
assert.InDelta(t, 1.0001, mlc.CurrentUsagePercent(), 0.000001)
}
func TestReserveWithContext(t *testing.T) {
mlc := NewMemoryLimitController(100, 1.0)
assert.True(t, mlc.TryReserveMemory(101))
gorNum := 10
// Reserve ctx timeout
waitGroup := sync.WaitGroup{}
waitGroup.Add(gorNum)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
for i := 0; i < gorNum; i++ {
go func() {
assert.False(t, mlc.ReserveMemory(ctx, 1))
waitGroup.Done()
}()
}
waitGroup.Wait()
assert.Equal(t, int64(101), mlc.CurrentUsage())
// Reserve ctx cancel
waitGroup.Add(gorNum)
cancelCtx, cancel := context.WithCancel(context.Background())
for i := 0; i < gorNum; i++ {
go func() {
assert.False(t, mlc.ReserveMemory(cancelCtx, 1))
waitGroup.Done()
}()
}
cancel()
waitGroup.Wait()
assert.Equal(t, int64(101), mlc.CurrentUsage())
}
func TestBlocking(t *testing.T) {
mlc := NewMemoryLimitController(100, 1.0)
assert.True(t, mlc.TryReserveMemory(101))
assert.Equal(t, int64(101), mlc.CurrentUsage())
assert.InDelta(t, 1.01, mlc.CurrentUsagePercent(), 0.000001)
gorNum := 10
chs := make([]chan int, gorNum)
for i := 0; i < gorNum; i++ {
chs[i] = make(chan int, 1)
go reserveMemory(mlc, chs[i])
}
// The threads are blocked since the quota is full
for i := 0; i < gorNum; i++ {
assert.False(t, awaitCh(chs[i]))
}
assert.Equal(t, int64(101), mlc.CurrentUsage())
mlc.ReleaseMemory(int64(gorNum))
for i := 0; i < gorNum; i++ {
assert.True(t, awaitCh(chs[i]))
}
assert.Equal(t, int64(101), mlc.CurrentUsage())
}
func TestStepRelease(t *testing.T) {
mlc := NewMemoryLimitController(100, 1.0)
assert.True(t, mlc.TryReserveMemory(101))
assert.Equal(t, int64(101), mlc.CurrentUsage())
assert.InDelta(t, 1.01, mlc.CurrentUsagePercent(), 0.000001)
gorNum := 10
ch := make(chan int, 1)
for i := 0; i < gorNum; i++ {
go reserveMemory(mlc, ch)
}
// The threads are blocked since the quota is full
assert.False(t, awaitCh(ch))
assert.Equal(t, int64(101), mlc.CurrentUsage())
for i := 0; i < gorNum; i++ {
mlc.ReleaseMemory(1)
assert.True(t, awaitCh(ch))
assert.False(t, awaitCh(ch))
}
assert.Equal(t, int64(101), mlc.CurrentUsage())
}
func TestRegisterTrigger(t *testing.T) {
mlc := NewMemoryLimitController(100, 0.95)
triggeredResult1 := false
triggeredResult2 := false
finishCh := make(chan struct{}, 2)
mlc.RegisterTrigger(func() {
triggeredResult1 = true
finishCh <- struct{}{}
})
mlc.RegisterTrigger(func() {
triggeredResult2 = true
finishCh <- struct{}{}
})
mlc.TryReserveMemory(50)
timer := time.NewTimer(time.Millisecond * 500)
select {
case <-finishCh:
assert.Fail(t, "should not be triggered")
case <-timer.C:
}
mlc.TryReserveMemory(45)
timer.Reset(time.Millisecond * 500)
for i := 0; i < 2; i++ {
select {
case <-finishCh:
case <-timer.C:
assert.Fail(t, "trigger timeout")
}
}
assert.True(t, triggeredResult1)
assert.True(t, triggeredResult2)
triggeredResult2 = false
mlc.ReleaseMemory(1)
mlc.ForceReserveMemory(1)
timer.Reset(time.Millisecond * 500)
for i := 0; i < 2; i++ {
select {
case <-finishCh:
case <-timer.C:
assert.Fail(t, "trigger timeout")
}
}
assert.True(t, triggeredResult2)
}
func reserveMemory(mlc MemoryLimitController, ch chan int) {
mlc.ReserveMemory(context.Background(), 1)
ch <- 1
}
func awaitCh(ch chan int) bool {
select {
case <-ch:
return true
case <-time.After(100 * time.Millisecond):
return false
}
}