blob: 1a9d658bf884ecb84bc7aef7c19f3770b46d5cf7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package pool
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)
func Test_newExecutor(t *testing.T) {
type args struct {
queueDepth int
numberOfInitialWorkers int
log zerolog.Logger
}
tests := []struct {
name string
args args
want *executor
manipulator func(t *testing.T, want *executor, got *executor)
}{
{
name: "just create it",
want: &executor{
worker: []*worker{},
},
manipulator: func(t *testing.T, want *executor, got *executor) {
assert.NotNil(t, got.workItems)
want.workItems = got.workItems
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newExecutor(tt.args.queueDepth, tt.args.numberOfInitialWorkers, tt.args.log)
want := tt.want
if tt.manipulator != nil {
tt.manipulator(t, want, got)
}
assert.Equalf(t, want, got, "newExecutor(%v, %v, %v)", tt.args.queueDepth, tt.args.numberOfInitialWorkers, tt.args.log)
})
}
}
func Test_executor_Close(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
workItems chan workItem
traceWorkers bool
}
tests := []struct {
name string
fields fields
wantErr assert.ErrorAssertionFunc
}{
{
name: "close it",
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &executor{
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
}
tt.wantErr(t, e.Close(), fmt.Sprintf("Close()"))
})
}
}
func Test_executor_IsRunning(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
workItems chan workItem
traceWorkers bool
}
tests := []struct {
name string
fields fields
want bool
}{
{
name: "no",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &executor{
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
}
assert.Equalf(t, tt.want, e.IsRunning(), "IsRunning()")
})
}
}
func Test_executor_Start(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
queue chan workItem
traceWorkers bool
}
tests := []struct {
name string
fields fields
shouldRun bool
}{
{
name: "Start fresh",
shouldRun: true,
},
{
name: "Start running",
fields: fields{
running: true,
},
shouldRun: true,
},
{
name: "Start stopping",
fields: fields{
running: true,
shutdown: true,
},
shouldRun: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &executor{
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
workItems: tt.fields.queue,
traceWorkers: tt.fields.traceWorkers,
}
e.Start()
assert.Equal(t, tt.shouldRun, e.IsRunning(), "should be running")
})
}
}
func Test_executor_Stop(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
queue chan workItem
traceWorkers bool
}
tests := []struct {
name string
fields fields
shouldRun bool
}{
{
name: "Stop stopped",
shouldRun: false,
},
{
name: "Stop running",
fields: fields{
running: true,
queue: make(chan workItem),
worker: []*worker{
func() *worker {
w := &worker{}
w.initialize()
return w
}(),
},
},
shouldRun: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &executor{
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
workItems: tt.fields.queue,
traceWorkers: tt.fields.traceWorkers,
}
e.Stop()
})
}
}
func Test_executor_Submit(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
queue chan workItem
traceWorkers bool
}
type args struct {
workItemId int32
runnable Runnable
context context.Context
}
tests := []struct {
name string
fields fields
args args
completionFutureValidator func(t *testing.T, future CompletionFuture) bool
waitForCompletion bool
}{
{
name: "submitting nothing",
completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
return assert.Error(t, completionFuture.(*future).err.Load().(error))
},
},
{
name: "submit canceled",
fields: fields{
queue: make(chan workItem),
},
args: args{
workItemId: 13,
runnable: func() {
// We do something for 3 seconds
<-time.NewTimer(3 * time.Second).C
},
context: func() context.Context {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()
return ctx
}(),
},
completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
err := completionFuture.(*future).err.Load().(error)
return assert.Error(t, err)
},
},
{
name: "Submit something which doesn't complete",
fields: fields{
queue: make(chan workItem, 1),
},
args: args{
workItemId: 13,
runnable: func() {
// We do something for 3 seconds
<-time.NewTimer(3 * time.Second).C
},
context: t.Context(),
},
completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
completed := completionFuture.(*future).completed.Load()
return assert.False(t, completed)
},
},
{
name: "Submit something which does complete",
fields: func() fields {
var executor = NewFixedSizeExecutor(1, 1).(*executor)
return fields{
running: executor.running,
shutdown: executor.shutdown,
worker: executor.worker,
queue: executor.workItems,
traceWorkers: true,
}
}(),
args: args{
workItemId: 13,
runnable: func() {
// NOOP
},
context: t.Context(),
},
completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
completed := completionFuture.(*future).completed.Load()
return assert.True(t, completed)
},
waitForCompletion: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &executor{
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
workItems: tt.fields.queue,
traceWorkers: tt.fields.traceWorkers,
}
e.Start()
completionFuture := e.Submit(tt.args.context, tt.args.workItemId, tt.args.runnable)
if tt.waitForCompletion {
assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
}
assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
})
}
}
func Test_executor_getWorkerWaitGroup(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
workItems chan workItem
traceWorkers bool
}
tests := []struct {
name string
fields fields
want *sync.WaitGroup
}{
{
name: "get it",
want: &sync.WaitGroup{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &executor{
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
}
assert.Equalf(t, tt.want, e.getWorkerWaitGroup(), "getWorkerWaitGroup()")
})
}
}
func Test_executor_getWorksItems(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
workItems chan workItem
traceWorkers bool
}
tests := []struct {
name string
fields fields
want chan workItem
}{
{
name: "get it",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &executor{
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
}
assert.Equalf(t, tt.want, e.getWorksItems(), "getWorksItems()")
})
}
}
func Test_executor_isTraceWorkers(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
workItems chan workItem
traceWorkers bool
}
tests := []struct {
name string
fields fields
want bool
}{
{
name: "it is not",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &executor{
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
}
assert.Equalf(t, tt.want, e.isTraceWorkers(), "isTraceWorkers()")
})
}
}
func Test_executor_String(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
workItems chan workItem
traceWorkers bool
}
tests := []struct {
name string
fields fields
want string
}{
{
name: "string it",
fields: fields{
running: true,
shutdown: true,
worker: []*worker{
{
id: 1,
shutdown: atomic.Bool{},
interrupted: atomic.Bool{},
lastReceived: func() atomic.Value {
value := atomic.Value{}
value.Store(time.Time{})
return value
}(),
},
},
traceWorkers: true,
},
want: `
╔═executor═════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║╔═running╗╔═shutdown╗ ║
║║b1 true ║║ b1 true ║ ║
║╚════════╝╚═════════╝ ║
║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗╔═workItems══╗║
║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║0 element(s)║║
║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║╚════════════╝║
║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║ ║
║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║
║╔═traceWorkers╗ ║
║║ b1 true ║ ║
║╚═════════════╝ ║
╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &executor{
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
}
assert.Equalf(t, tt.want, e.String(), "String()")
})
}
}