blob: 484894d660a73b5597df95eb36764ac1d0c84e88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package transactions
import (
"container/list"
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
)
func TestNewRequestTransactionManager(t *testing.T) {
type args struct {
numberOfConcurrentRequests int
requestTransactionManagerOptions []options.WithOption
}
tests := []struct {
name string
args args
setup func(t *testing.T, args *args)
wantAssert func(*testing.T, RequestTransactionManager) bool
}{
{
name: "just create one",
wantAssert: func(t *testing.T, rtm RequestTransactionManager) bool {
require.IsType(t, &requestTransactionManager{}, rtm)
rtmi := rtm.(*requestTransactionManager)
assert.NotNil(t, rtm)
assert.NotNil(t, rtmi.executor)
assert.Same(t, rtmi.executor, sharedExecutorInstance)
assert.NotNil(t, rtmi.workLog)
assert.NotNil(t, rtmi.ctx)
assert.NotNil(t, rtmi.cancelCtx)
return true
},
},
{
name: "just create one with option",
args: args{
numberOfConcurrentRequests: 2,
requestTransactionManagerOptions: []options.WithOption{
WithCustomExecutor(sharedExecutorInstance),
},
},
wantAssert: func(t *testing.T, rtm RequestTransactionManager) bool {
require.IsType(t, &requestTransactionManager{}, rtm)
rtmi := rtm.(*requestTransactionManager)
assert.NotNil(t, rtm)
assert.NotNil(t, rtmi.executor)
assert.Same(t, rtmi.executor, sharedExecutorInstance)
assert.NotNil(t, rtmi.workLog)
assert.NotNil(t, rtmi.ctx)
assert.NotNil(t, rtmi.cancelCtx)
assert.Equal(t, 2, rtmi.numberOfConcurrentRequests)
return true
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.args)
}
got := NewRequestTransactionManager(tt.args.numberOfConcurrentRequests, tt.args.requestTransactionManagerOptions...)
if !assert.True(t, tt.wantAssert(t, got)) {
t.Errorf("NewRequestTransactionManager() = %v", got)
}
})
}
}
func TestWithCustomExecutor(t *testing.T) {
type args struct {
executor pool.Executor
}
tests := []struct {
name string
args args
want options.WithOption
}{
{
name: "with a option",
args: args{
executor: sharedExecutorInstance,
},
want: WithCustomExecutor(sharedExecutorInstance),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
customExecutor := WithCustomExecutor(tt.args.executor)
assert.NotNil(t, customExecutor)
})
}
}
func Test_requestTransactionManager_SetNumberOfConcurrentRequests(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
}
type args struct {
numberOfConcurrentRequests int
}
tests := []struct {
name string
fields fields
args args
}{
{
name: "set a number",
},
{
name: "set a number on running requests",
fields: fields{
runningRequests: []*requestTransaction{
{}, // empty one is sufficient
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
r.SetNumberOfConcurrentRequests(tt.args.numberOfConcurrentRequests)
})
}
}
func Test_requestTransactionManager_StartTransaction(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
traceTransactionManagerTransactions bool
}
tests := []struct {
name string
fields fields
setup func(t *testing.T, fields *fields)
manipulator func(t *testing.T, manager *requestTransactionManager)
wantAssert func(t *testing.T, requestTransaction RequestTransaction) bool
}{
{
name: "start one",
setup: func(t *testing.T, fields *fields) {
fields.executor = pool.NewFixedSizeExecutor(10, 10, options.WithCustomLogger(produceTestingLogger(t)))
},
wantAssert: func(t *testing.T, requestTransaction RequestTransaction) bool {
assert.False(t, requestTransaction.IsCompleted())
return true
},
},
{
name: "start one in shutdown",
setup: func(t *testing.T, fields *fields) {
fields.executor = pool.NewFixedSizeExecutor(10, 10, options.WithCustomLogger(produceTestingLogger(t)))
},
manipulator: func(t *testing.T, manager *requestTransactionManager) {
manager.shutdown.Store(true)
},
wantAssert: func(t *testing.T, requestTransaction RequestTransaction) bool {
assert.True(t, requestTransaction.IsCompleted())
assert.Error(t, requestTransaction.AwaitCompletion(context.Background()))
return true
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields)
}
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
log: produceTestingLogger(t),
}
if tt.manipulator != nil {
tt.manipulator(t, r)
}
if got := r.StartTransaction(); !assert.True(t, tt.wantAssert(t, got)) {
t.Errorf("StartTransaction() = %v", got)
}
})
}
}
func Test_requestTransactionManager_endRequest(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
}
type args struct {
transaction *requestTransaction
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
wantErr bool
}{
{
name: "end request with unknown transaction",
args: args{
transaction: &requestTransaction{},
},
setup: func(t *testing.T, fields *fields, args *args) {
fields.executor = pool.NewFixedSizeExecutor(10, 10, options.WithCustomLogger(produceTestingLogger(t)))
},
wantErr: true,
},
{
name: "end request",
args: args{
transaction: &requestTransaction{},
},
setup: func(t *testing.T, fields *fields, args *args) {
fields.executor = pool.NewFixedSizeExecutor(10, 10, options.WithCustomLogger(produceTestingLogger(t)))
},
fields: fields{
runningRequests: []*requestTransaction{
{},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
if err := r.endRequest(tt.args.transaction); (err != nil) != tt.wantErr {
t.Errorf("endRequest() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_requestTransactionManager_failRequest(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
}
type args struct {
transaction *requestTransaction
err error
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
wantErr bool
}{
{
name: "fail a request",
args: args{
transaction: &requestTransaction{},
},
setup: func(t *testing.T, fields *fields, args *args) {
fields.executor = pool.NewFixedSizeExecutor(10, 10, options.WithCustomLogger(produceTestingLogger(t)))
completionFutureMock := NewMockCompletionFuture(t)
expect := completionFutureMock.EXPECT()
expect.Cancel(true, nil).Return()
var completionFuture pool.CompletionFuture = completionFutureMock
args.transaction.completionFuture.Store(&completionFuture)
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
log: produceTestingLogger(t),
}
if err := r.failRequest(tt.args.transaction, tt.args.err); (err != nil) != tt.wantErr {
t.Errorf("failRequest() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_requestTransactionManager_getNumberOfActiveRequests(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
}
tests := []struct {
name string
fields fields
want int
}{
{
name: "get em",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
if got := r.getNumberOfActiveRequests(); got != tt.want {
t.Errorf("getNumberOfActiveRequests() = %v, want %v", got, tt.want)
}
})
}
}
func Test_requestTransactionManager_processWorklog(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
}
tests := []struct {
name string
fields fields
setup func(t *testing.T, fields *fields)
}{
{
name: "process nothing",
},
{
name: "process one",
fields: fields{
numberOfConcurrentRequests: 100,
workLog: func() list.List {
l := list.New()
l.PushBack(&requestTransaction{})
return *l
}(),
executor: sharedExecutorInstance,
},
setup: func(t *testing.T, fields *fields) {
fields.executor = pool.NewFixedSizeExecutor(10, 10, options.WithCustomLogger(produceTestingLogger(t)))
},
},
{
name: "process two",
fields: fields{
numberOfConcurrentRequests: 100,
workLog: func() list.List {
l := list.New()
var completionFuture pool.CompletionFuture = NewMockCompletionFuture(t)
r1 := &requestTransaction{
transactionId: 1,
}
r1.completionFuture.Store(&completionFuture)
l.PushBack(r1)
r2 := &requestTransaction{
transactionId: 2,
}
r2.completionFuture.Store(&completionFuture)
l.PushBack(r2)
return *l
}(),
executor: sharedExecutorInstance,
},
setup: func(t *testing.T, fields *fields) {
fields.executor = pool.NewFixedSizeExecutor(10, 10, options.WithCustomLogger(produceTestingLogger(t)))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields)
}
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
r.processWorklog()
})
}
}
func Test_requestTransactionManager_submitTransaction(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
}
type args struct {
handle *requestTransaction
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
}{
{
name: "submit it",
args: args{
handle: &requestTransaction{
operation: func(context.Context) {
// doesn't matter
},
},
},
setup: func(t *testing.T, fields *fields, args *args) {
fields.executor = pool.NewFixedSizeExecutor(10, 10, options.WithCustomLogger(produceTestingLogger(t)))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
r.submitTransaction(tt.args.handle)
})
}
}
func Test_requestTransactionManager_Close(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
ctx context.Context
cancelCtx context.CancelFunc
traceTransactionManagerTransactions bool
}
tests := []struct {
name string
fields fields
setup func(t *testing.T, fields *fields)
wantErr assert.ErrorAssertionFunc
}{
{
name: "close it",
setup: func(t *testing.T, fields *fields) {
fields.ctx, fields.cancelCtx = context.WithCancel(t.Context())
executor := NewMockExecutor(t)
executor.EXPECT().Close().Return(nil)
fields.executor = executor
},
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields)
}
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
ctx: tt.fields.ctx,
cancelCtx: tt.fields.cancelCtx,
log: produceTestingLogger(t),
}
tt.wantErr(t, r.Close(), fmt.Sprintf("Close()"))
})
}
}
func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
ctx context.Context
cancelCtx context.CancelFunc
traceTransactionManagerTransactions bool
}
type args struct {
timeout time.Duration
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields)
wantErr assert.ErrorAssertionFunc
}{
{
name: "close it",
setup: func(t *testing.T, fields *fields) {
fields.ctx, fields.cancelCtx = context.WithCancel(t.Context())
executor := NewMockExecutor(t)
executor.EXPECT().Close().Return(nil)
fields.executor = executor
},
wantErr: assert.NoError,
},
{
name: "close it with timeout",
args: args{
timeout: 20 * time.Millisecond,
},
setup: func(t *testing.T, fields *fields) {
fields.ctx, fields.cancelCtx = context.WithCancel(t.Context())
executor := NewMockExecutor(t)
executor.EXPECT().Close().Return(nil)
fields.executor = executor
},
wantErr: assert.NoError,
},
{
name: "close it with timeout fires",
fields: fields{
runningRequests: []*requestTransaction{
{},
},
},
args: args{
timeout: 20 * time.Millisecond,
},
setup: func(t *testing.T, fields *fields) {
fields.ctx, fields.cancelCtx = context.WithCancel(t.Context())
executor := NewMockExecutor(t)
executor.EXPECT().Close().Return(nil)
fields.executor = executor
},
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields)
}
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
ctx: tt.fields.ctx,
cancelCtx: tt.fields.cancelCtx,
log: produceTestingLogger(t),
}
tt.wantErr(t, r.CloseGraceful(tt.args.timeout), fmt.Sprintf("CloseGraceful(%v)", tt.args.timeout))
})
}
}
func Test_requestTransactionManager_String(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
currentTransactionId int32
workLog list.List
executor pool.Executor
traceTransactionManagerTransactions bool
}
tests := []struct {
name string
fields fields
want string
}{
{
name: "string it",
fields: fields{
runningRequests: []*requestTransaction{
{
transactionId: 2,
},
},
numberOfConcurrentRequests: 3,
currentTransactionId: 4,
workLog: func() list.List {
v := list.List{}
v.PushBack(nil)
return v
}(),
executor: pool.NewFixedSizeExecutor(1, 1),
traceTransactionManagerTransactions: true,
},
want: `
╔═requestTransactionManager═══════════════════════════════════════════════════════════════════════════════════════════╗
║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗ ║
║║ ╔═transactionId╗╔═completed╗ ║║ 0x0000000000000003 3 ║║ 0x00000004 4 ║ ║
║║ ║ 0x00000002 2 ║║ b0 false ║ ║╚═══════════════════════════╝╚═════════════════════╝ ║
║║ ╚══════════════╝╚══════════╝ ║ ║
║╚═════════════════════════════════════════╝ ║
║╔═executor/executor══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
║║╔═running╗╔═shutdown╗ ║║b0 false ║║
║║║b0 false║║b0 false ║ ║╚═════════╝║
║║╚════════╝╚═════════╝ ║ ║
║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║ ║
║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║ ║
║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║ ║
║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║ ║
║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║ ║
║║╔═workItems══╗╔═traceWorkers╗ ║ ║
║║║0 element(s)║║ b0 false ║ ║ ║
║║╚════════════╝╚═════════════╝ ║ ║
║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║
║╔═traceTransactionManagerTransactions╗ ║
║║ b1 true ║ ║
║╚════════════════════════════════════╝ ║
╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
log: produceTestingLogger(t),
}
assert.Equalf(t, tt.want, r.String(), "String()")
})
}
}