blob: fb695e4da34ba2071d581dff9a53d7d9877b1eb9 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"errors"
"sync"
"testing"
"time"
)
import (
"github.com/cenkalti/backoff/v4"
"go.uber.org/atomic"
)
func TestOrdering(t *testing.T) {
numValues := 1000
q := NewQueue(1 * time.Microsecond)
stop := make(chan struct{})
defer close(stop)
wg := sync.WaitGroup{}
wg.Add(numValues)
mu := sync.Mutex{}
out := make([]int, 0)
for i := 0; i < numValues; i++ {
i := i
q.Push(func() error {
mu.Lock()
out = append(out, i)
defer mu.Unlock()
wg.Done()
return nil
})
// Start the queue at the halfway point.
if i == numValues/2 {
go q.Run(stop)
}
}
// wait for all task processed
wg.Wait()
if len(out) != numValues {
t.Fatalf("expected output array length %d to equal %d", len(out), numValues)
}
for i := 0; i < numValues; i++ {
if i != out[i] {
t.Fatalf("expected out[%d] %v to equal %v", i, out[i], i)
}
}
}
func TestRetry(t *testing.T) {
q := NewQueue(1 * time.Microsecond)
stop := make(chan struct{})
defer close(stop)
// Push a task that fails the first time and retries.
wg := sync.WaitGroup{}
wg.Add(2)
failed := false
q.Push(func() error {
defer wg.Done()
if failed {
return nil
}
failed = true
return errors.New("fake error")
})
go q.Run(stop)
// wait for the task to run twice.
wg.Wait()
}
func TestRetryWithBackoff(t *testing.T) {
ebf := backoff.NewExponentialBackOff()
ebf.InitialInterval = 1 * time.Microsecond
ebf.MaxInterval = 5 * time.Microsecond
q := NewBackOffQueue(ebf)
stop := make(chan struct{})
defer close(stop)
// Push a task that fails the first time and retries.
wg := sync.WaitGroup{}
wg.Add(2)
failed := false
q.Push(func() error {
defer wg.Done()
if failed {
return nil
}
failed = true
return errors.New("fake error")
})
go q.Run(stop)
// wait for the task to run twice.
wg.Wait()
}
func TestResourceFree(t *testing.T) {
q := NewQueue(1 * time.Microsecond)
stop := make(chan struct{})
signal := make(chan struct{})
go func() {
q.Run(stop)
signal <- struct{}{}
}()
q.Push(func() error {
t.Log("mock exec")
return nil
})
// mock queue block wait cond signal
time.AfterFunc(10*time.Millisecond, func() {
close(stop)
})
select {
case <-time.After(200 * time.Millisecond):
t.Error("close stop, method exit timeout.")
case <-signal:
t.Log("queue return.")
}
}
func TestClosed(t *testing.T) {
t.Run("immediate close", func(t *testing.T) {
stop := make(chan struct{})
q := NewQueue(0)
go q.Run(stop)
close(stop)
if err := WaitForClose(q, 10*time.Second); err != nil {
t.Error(err)
}
})
t.Run("no tasks after close", func(t *testing.T) {
stop := make(chan struct{})
q := NewQueue(0)
taskComplete := atomic.NewBool(false)
q.Push(func() error {
close(stop)
return nil
})
go q.Run(stop)
if err := WaitForClose(q, 10*time.Second); err != nil {
t.Error()
}
q.Push(func() error {
taskComplete.Store(true)
return nil
})
if taskComplete.Load() {
t.Error("task ran on closed queue")
}
})
}