blob: cea1bfd9daeb1073559835c38753a73c6842210e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package cbus
import (
"context"
"encoding/hex"
"net/url"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/transports/test"
)
func TestAlphaGenerator_getAndIncrement(t *testing.T) {
type fields struct {
currentAlpha byte
}
tests := []struct {
name string
fields fields
want byte
}{
{
name: "get a alpha invalid instance",
},
{
name: "get a alpha",
fields: fields{
currentAlpha: 'g',
},
want: 'g',
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &AlphaGenerator{
currentAlpha: tt.fields.currentAlpha,
}
assert.Equalf(t, tt.want, a.getAndIncrement(), "getAndIncrement()")
})
}
}
func TestAlphaGenerator_getAndIncrement_Turnaround(t *testing.T) {
a := &AlphaGenerator{
currentAlpha: 'y',
}
// Currently it is 'y' so the next call should return 'z'
assert.Equal(t, a.getAndIncrement(), uint8('y'))
// Currently it is 'z' so the next call should return 'g' as we roll over
assert.Equal(t, a.getAndIncrement(), uint8('z'))
// Currently it is 'g' so the next call should return 'h'
assert.Equal(t, a.getAndIncrement(), uint8('g'))
// the final 'h'
assert.Equal(t, a.getAndIncrement(), uint8('h'))
}
func TestConnection_BrowseRequestBuilder(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
wantAssert func(*testing.T, apiModel.PlcBrowseRequestBuilder) bool
}{
{
name: "return not nil",
wantAssert: func(t *testing.T, builder apiModel.PlcBrowseRequestBuilder) bool {
return assert.NotNil(t, builder)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.True(t, tt.wantAssert(t, c.BrowseRequestBuilder()), "BrowseRequestBuilder()")
})
}
}
func TestConnection_ConnectWithContext(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
type args struct {
ctx context.Context
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields)
wantAsserter func(*testing.T, <-chan plc4go.PlcConnectionConnectResult) bool
}{
{
name: "just connect and fail",
fields: fields{
configuration: Configuration{
Srchk: false,
Exstat: false,
Pun: false,
LocalSal: false,
Pcn: false,
Idmon: false,
Monitor: false,
Smart: false,
XonXoff: false,
Connect: false,
MonitoredApplication1: 0,
MonitoredApplication2: 0,
},
connectionId: "connectionId13",
tracer: nil,
},
args: args{ctx: context.Background()},
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
},
wantAsserter: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
assert.NotNil(t, results)
result := <-results
assert.Nil(t, result.GetConnection())
assert.NotNil(t, result.GetErr())
return true
},
},
// TODO: add error case for failing messageCodec connect
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.True(t, tt.wantAsserter(t, c.ConnectWithContext(tt.args.ctx)), "ConnectWithContext(%v)", tt.args.ctx)
// To shut down properly we always do that
c.SetConnected(false)
c.handlerWaitGroup.Wait()
})
}
}
func TestConnection_GetConnection(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
setup func(t *testing.T, fields *fields)
wantAsserter func(t *testing.T, connection plc4go.PlcConnection) bool
}{
{
name: "not nil",
wantAsserter: func(t *testing.T, connection plc4go.PlcConnection) bool {
return assert.NotNil(t, connection)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Truef(t, tt.wantAsserter(t, c.GetConnection()), "GetConnection()")
})
}
}
func TestConnection_GetConnectionId(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
want string
}{
{
name: "simple id",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.want, c.GetConnectionId(), "GetConnectionId()")
})
}
}
func TestConnection_GetMessageCodec(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
want spi.MessageCodec
}{
{
name: "just get",
fields: fields{
messageCodec: &MessageCodec{},
},
want: &MessageCodec{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.want, c.GetMessageCodec(), "GetMessageCodec()")
})
}
}
func TestConnection_GetMetadata(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
want apiModel.PlcConnectionMetadata
}{
{
name: "give metadata",
want: &_default.DefaultConnectionMetadata{
ConnectionAttributes: nil,
ProvidesReading: true,
ProvidesWriting: true,
ProvidesSubscribing: true,
ProvidesBrowsing: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.want, c.GetMetadata(), "GetMetadata()")
})
}
}
func TestConnection_GetTracer(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
want tracer.Tracer
}{
{
name: "just nil",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.want, c.GetTracer(), "GetTracer()")
})
}
}
func TestConnection_IsTraceEnabled(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
want bool
}{
{
name: "not enabled",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.want, c.IsTraceEnabled(), "IsTraceEnabled()")
})
}
}
func TestConnection_ReadRequestBuilder(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
wantAssert func(*testing.T, apiModel.PlcReadRequestBuilder) bool
}{
{
name: "return not nil",
wantAssert: func(t *testing.T, builder apiModel.PlcReadRequestBuilder) bool {
return assert.NotNil(t, builder)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Truef(t, tt.wantAssert(t, c.ReadRequestBuilder()), "ReadRequestBuilder()")
})
}
}
func TestConnection_SubscriptionRequestBuilder(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
wantAssert func(*testing.T, apiModel.PlcSubscriptionRequestBuilder) bool
}{
{
name: "return not nil",
wantAssert: func(t *testing.T, builder apiModel.PlcSubscriptionRequestBuilder) bool {
return assert.NotNil(t, builder)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Truef(t, tt.wantAssert(t, c.SubscriptionRequestBuilder()), "SubscriptionRequestBuilder()")
})
}
}
func TestConnection_UnsubscriptionRequestBuilder(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
want apiModel.PlcUnsubscriptionRequestBuilder
}{
{
name: "create one",
want: spiModel.NewDefaultPlcUnsubscriptionRequestBuilder(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.want, c.UnsubscriptionRequestBuilder(), "UnsubscriptionRequestBuilder()")
})
}
}
func TestConnection_WriteRequestBuilder(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
wantAssert func(*testing.T, apiModel.PlcWriteRequestBuilder) bool
}{
{
name: "return not nil",
wantAssert: func(t *testing.T, builder apiModel.PlcWriteRequestBuilder) bool {
return assert.NotNil(t, builder)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Truef(t, tt.wantAssert(t, c.WriteRequestBuilder()), "WriteRequestBuilder()")
})
}
}
func TestConnection_addSubscriber(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
type args struct {
subscriber *Subscriber
}
theOneSubscriber := NewSubscriber(nil)
tests := []struct {
name string
fields fields
args args
subElevator func(*testing.T, []*Subscriber) bool
}{
{
name: "new subscriber",
args: args{subscriber: NewSubscriber(nil)},
subElevator: func(t *testing.T, subscribers []*Subscriber) bool {
return len(subscribers) == 1
},
},
{
name: "existing subscriber should not be added",
args: args{subscriber: theOneSubscriber},
subElevator: func(t *testing.T, subscribers []*Subscriber) bool {
return len(subscribers) == 1
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
c.addSubscriber(tt.args.subscriber)
assert.Truef(t, tt.subElevator(t, c.subscribers), "addSubscriber(%v)", tt.args.subscriber)
})
}
}
func TestConnection_fireConnected(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer tracer.Tracer
}
type args struct {
ch chan<- plc4go.PlcConnectionConnectResult
}
tests := []struct {
name string
fields fields
args args
chanValidator func(*testing.T, chan<- plc4go.PlcConnectionConnectResult) bool
}{
{
name: "instant connect",
chanValidator: func(_ *testing.T, _ chan<- plc4go.PlcConnectionConnectResult) bool {
return true
},
},
{
name: "notified connect",
fields: fields{
driverContext: driverContextForTesting(),
},
args: args{ch: make(chan<- plc4go.PlcConnectionConnectResult, 1)},
chanValidator: func(t *testing.T, results chan<- plc4go.PlcConnectionConnectResult) bool {
time.Sleep(50 * time.Millisecond)
return len(results) == 1
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
c.fireConnected(tt.args.ch)
assert.True(t, tt.chanValidator(t, tt.args.ch))
})
}
}
func TestConnection_fireConnectionError(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer tracer.Tracer
}
type args struct {
err error
ch chan<- plc4go.PlcConnectionConnectResult
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
chanValidator func(*testing.T, chan<- plc4go.PlcConnectionConnectResult) bool
}{
{
name: "instant connect",
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
},
chanValidator: func(_ *testing.T, _ chan<- plc4go.PlcConnectionConnectResult) bool {
return true
},
},
{
name: "notified connect",
fields: fields{
driverContext: driverContextForTesting(),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
},
args: args{ch: make(chan<- plc4go.PlcConnectionConnectResult, 1)},
chanValidator: func(t *testing.T, results chan<- plc4go.PlcConnectionConnectResult) bool {
time.Sleep(50 * time.Millisecond)
return len(results) == 1
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
c.fireConnectionError(tt.args.err, tt.args.ch)
assert.True(t, tt.chanValidator(t, tt.args.ch))
})
}
}
func TestConnection_sendCalDataWrite(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
type args struct {
ctx context.Context
ch chan plc4go.PlcConnectionConnectResult
paramNo readWriteModel.Parameter
parameterValue readWriteModel.ParameterValue
requestContext *readWriteModel.RequestContext
cbusOptions *readWriteModel.CBusOptions
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields)
want bool
}{
{
name: "send something",
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
paramNo: readWriteModel.Parameter_APPLICATION_ADDRESS_2,
parameterValue: readWriteModel.NewParameterValueApplicationAddress2(readWriteModel.NewApplicationAddress2(1), nil),
requestContext: func() *readWriteModel.RequestContext {
var requestContext readWriteModel.RequestContext = readWriteModel.NewRequestContext(false)
return &requestContext
}(),
cbusOptions: func() *readWriteModel.CBusOptions {
var cBusOptions readWriteModel.CBusOptions = readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
return &cBusOptions
}(),
},
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.want, c.sendCalDataWrite(tt.args.ctx, tt.args.ch, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext, tt.args.cbusOptions), "sendCalDataWrite(%v, %v, %v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext, tt.args.cbusOptions)
})
}
}
func TestConnection_sendReset(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
type args struct {
ctx context.Context
ch chan plc4go.PlcConnectionConnectResult
cbusOptions *readWriteModel.CBusOptions
requestContext *readWriteModel.RequestContext
sendOutErrorNotification bool
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
wantOk bool
}{
{
name: "send reset",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
cbusOptions: func() *readWriteModel.CBusOptions {
var cBusOptions readWriteModel.CBusOptions = readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
return &cBusOptions
}(),
requestContext: func() *readWriteModel.RequestContext {
var requestContext readWriteModel.RequestContext = readWriteModel.NewRequestContext(false)
return &requestContext
}(),
sendOutErrorNotification: false,
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.wantOk, c.sendReset(tt.args.ctx, tt.args.ch, tt.args.cbusOptions, tt.args.requestContext, tt.args.sendOutErrorNotification), "sendReset(%v, %v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.cbusOptions, tt.args.requestContext, tt.args.sendOutErrorNotification)
})
}
}
func TestConnection_setApplicationFilter(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
type args struct {
ctx context.Context
ch chan plc4go.PlcConnectionConnectResult
requestContext *readWriteModel.RequestContext
cbusOptions *readWriteModel.CBusOptions
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
wantOk bool
}{
{
name: "set application filter (failing)",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
cbusOptions: func() *readWriteModel.CBusOptions {
var cBusOptions readWriteModel.CBusOptions = readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
return &cBusOptions
}(),
requestContext: func() *readWriteModel.RequestContext {
var requestContext readWriteModel.RequestContext = readWriteModel.NewRequestContext(false)
return &requestContext
}(),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
// Setup connection
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.wantOk, c.setApplicationFilter(tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions), "setApplicationFilter(%v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions)
})
}
}
func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
type args struct {
ctx context.Context
ch chan plc4go.PlcConnectionConnectResult
requestContext *readWriteModel.RequestContext
cbusOptions *readWriteModel.CBusOptions
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
wantOk bool
}{
{
name: "set interface 1 PUN options (failing)",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
cbusOptions: func() *readWriteModel.CBusOptions {
var cBusOptions readWriteModel.CBusOptions = readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
return &cBusOptions
}(),
requestContext: func() *readWriteModel.RequestContext {
var requestContext readWriteModel.RequestContext = readWriteModel.NewRequestContext(false)
return &requestContext
}(),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
// Setup connection
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.wantOk, c.setInterface1PowerUpSettings(tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions), "setInterface1PowerUpSettings(%v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions)
})
}
}
func TestConnection_setInterfaceOptions1(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
type args struct {
ctx context.Context
ch chan plc4go.PlcConnectionConnectResult
requestContext *readWriteModel.RequestContext
cbusOptions *readWriteModel.CBusOptions
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
want bool
}{
{
name: "set interface 1 options (failing)",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
cbusOptions: func() *readWriteModel.CBusOptions {
var cBusOptions readWriteModel.CBusOptions = readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
return &cBusOptions
}(),
requestContext: func() *readWriteModel.RequestContext {
var requestContext readWriteModel.RequestContext = readWriteModel.NewRequestContext(false)
return &requestContext
}(),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
// Setup connection
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.want, c.setInterfaceOptions1(tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions), "setInterfaceOptions1(%v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions)
})
}
}
func TestConnection_setInterfaceOptions3(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
type args struct {
ctx context.Context
ch chan plc4go.PlcConnectionConnectResult
requestContext *readWriteModel.RequestContext
cbusOptions *readWriteModel.CBusOptions
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
wantOk bool
}{
{
name: "set interface 3 options (failing)",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
cbusOptions: func() *readWriteModel.CBusOptions {
var cBusOptions readWriteModel.CBusOptions = readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
return &cBusOptions
}(),
requestContext: func() *readWriteModel.RequestContext {
var requestContext readWriteModel.RequestContext = readWriteModel.NewRequestContext(false)
return &requestContext
}(),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
// Setup connection
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
assert.Equalf(t, tt.wantOk, c.setInterfaceOptions3(tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions), "setInterfaceOptions3(%v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions)
})
}
}
func TestConnection_setupConnection(t *testing.T) {
type fields struct {
_DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
type args struct {
ctx context.Context
ch chan plc4go.PlcConnectionConnectResult
}
tests := []struct {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
validator func(t *testing.T, result plc4go.PlcConnectionConnectResult)
}{
{
name: "setup connection (failing)",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
assert.NotNil(t, result)
assert.Error(t, result.GetErr())
},
},
{
name: "setup connection (failing after reset)",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
// Build the message codec
transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
ti, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
require.NoError(t, err)
type MockState uint8
const (
RESET MockState = iota
DONE
)
currentState := atomic.Value{}
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
transportInstance.FillReadBuffer([]byte("~~~\r"))
currentState.Store(DONE)
case DONE:
t.Log("Done")
}
})
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
assert.NotNil(t, result)
assert.Error(t, result.GetErr())
},
},
{
name: "setup connection (failing after app filters)",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
// Build the message codec
transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
ti, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
require.NoError(t, err)
type MockState uint8
const (
RESET MockState = iota
APPLICATION_FILTER_1
APPLICATION_FILTER_2
DONE
)
currentState := atomic.Value{}
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
transportInstance.FillReadBuffer([]byte("~~~\r"))
currentState.Store(APPLICATION_FILTER_1)
case APPLICATION_FILTER_1:
t.Log("Dispatching app1 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
currentState.Store(APPLICATION_FILTER_2)
case APPLICATION_FILTER_2:
t.Log("Dispatching app2 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
currentState.Store(DONE)
case DONE:
t.Log("Done")
}
})
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
assert.NotNil(t, result)
assert.Error(t, result.GetErr())
},
},
{
name: "setup connection (failing after interface options 3",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
// Build the message codec
transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
ti, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
require.NoError(t, err)
type MockState uint8
const (
RESET MockState = iota
APPLICATION_FILTER_1
APPLICATION_FILTER_2
INTERFACE_OPTIONS_3
DONE
)
currentState := atomic.Value{}
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
t.Logf("Reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
transportInstance.FillReadBuffer([]byte("~~~\r"))
currentState.Store(APPLICATION_FILTER_1)
case APPLICATION_FILTER_1:
t.Log("Dispatching app1 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
currentState.Store(APPLICATION_FILTER_2)
case APPLICATION_FILTER_2:
t.Log("Dispatching app2 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
currentState.Store(INTERFACE_OPTIONS_3)
case INTERFACE_OPTIONS_3:
t.Log("Dispatching interface 3 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A342000A\r"))
transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
currentState.Store(DONE)
case DONE:
t.Log("Done")
}
})
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
assert.NotNil(t, result)
assert.Error(t, result.GetErr())
},
},
{
name: "setup connection (failing after interface options 1 pun)",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
// Build the message codec
transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
ti, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
require.NoError(t, err)
type MockState uint8
const (
RESET MockState = iota
APPLICATION_FILTER_1
APPLICATION_FILTER_2
INTERFACE_OPTIONS_3
INTERFACE_OPTIONS_1_PUN
DONE
)
currentState := atomic.Value{}
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
transportInstance.FillReadBuffer([]byte("~~~\r"))
currentState.Store(APPLICATION_FILTER_1)
case APPLICATION_FILTER_1:
t.Log("Dispatching app1 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
currentState.Store(APPLICATION_FILTER_2)
case APPLICATION_FILTER_2:
t.Log("Dispatching app2 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
currentState.Store(INTERFACE_OPTIONS_3)
case INTERFACE_OPTIONS_3:
t.Log("Dispatching interface 3 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A342000A\r"))
transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
currentState.Store(INTERFACE_OPTIONS_1_PUN)
case INTERFACE_OPTIONS_1_PUN:
t.Log("Dispatching interface 1 PUN echo and confirm???")
transportInstance.FillReadBuffer([]byte("@A3410079\r"))
transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
currentState.Store(DONE)
case DONE:
t.Log("Done")
}
})
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
assert.NotNil(t, result)
assert.Error(t, result.GetErr())
},
},
{
name: "setup connection",
args: args{
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
setup: func(t *testing.T, fields *fields, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
// Build the message codec
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
type MockState uint8
const (
RESET MockState = iota
APPLICATION_FILTER_1
APPLICATION_FILTER_2
INTERFACE_OPTIONS_3
INTERFACE_OPTIONS_1_PUN
INTERFACE_OPTIONS_1
DONE
)
currentState := atomic.Value{}
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
case RESET:
t.Log("Dispatching reset echo")
transportInstance.FillReadBuffer([]byte("~~~\r"))
currentState.Store(APPLICATION_FILTER_1)
case APPLICATION_FILTER_1:
t.Log("Dispatching app1 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
currentState.Store(APPLICATION_FILTER_2)
case APPLICATION_FILTER_2:
t.Log("Dispatching app2 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
currentState.Store(INTERFACE_OPTIONS_3)
case INTERFACE_OPTIONS_3:
t.Log("Dispatching interface 3 echo and confirm")
transportInstance.FillReadBuffer([]byte("@A342000A\r"))
transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
currentState.Store(INTERFACE_OPTIONS_1_PUN)
case INTERFACE_OPTIONS_1_PUN:
t.Log("Dispatching interface 1 PUN echo and confirm???")
transportInstance.FillReadBuffer([]byte("@A3410079\r"))
transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
currentState.Store(INTERFACE_OPTIONS_1)
case INTERFACE_OPTIONS_1:
t.Log("Dispatching interface 1 echo and confirm???")
transportInstance.FillReadBuffer([]byte("@A3300079\r"))
transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
currentState.Store(DONE)
case DONE:
t.Log("Done")
}
})
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
args.ctx = testutils.TestContext(t)
},
validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
assert.NotNil(t, result)
assert.NoError(t, result.GetErr())
assert.NotNil(t, result.GetConnection())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
c.setupConnection(tt.args.ctx, tt.args.ch)
assert.NotNil(t, tt.args.ch, "We always need a result channel")
chanTimeout := time.NewTimer(10 * time.Second)
select {
case <-chanTimeout.C:
t.Fatal("setup connection doesn't fill chan in time")
case result := <-tt.args.ch:
if tt.validator != nil {
tt.validator(t, result)
}
}
// To shut down properly we always do that
closeTimeout := time.NewTimer(10 * time.Second)
select {
case <-closeTimeout.C:
t.Fatal("close didn't react in time")
case <-c.Close():
t.Log("connection closed")
}
})
}
}
func TestConnection_startSubscriptionHandler(t *testing.T) {
type fields struct {
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration
connectionId string
tracer tracer.Tracer
}
tests := []struct {
name string
fields fields
setup func(t *testing.T, fields *fields)
manipulator func(t *testing.T, connection *Connection)
}{
{
name: "just start",
manipulator: func(t *testing.T, connection *Connection) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
},
},
{
name: "just start and feed (no subs)",
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
codec := NewMessageCodec(nil, _options...)
codec.monitoredMMIs = make(chan readWriteModel.CALReply, 1)
codec.monitoredSALs = make(chan readWriteModel.MonitoredSAL, 1)
dispatchWg := new(sync.WaitGroup)
t.Cleanup(dispatchWg.Wait)
dispatchWg.Go(func() {
codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil)
codec.monitoredSALs <- readWriteModel.NewMonitoredSALShortFormBasicMode(
0,
0,
nil,
nil,
nil,
readWriteModel.ApplicationIdContainer_ACCESS_CONTROL_D5,
nil,
)
})
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
fields.messageCodec = codec
},
manipulator: func(t *testing.T, connection *Connection) {
connection.SetConnected(true)
},
},
{
name: "just start and feed",
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
fields.subscribers = []*Subscriber{NewSubscriber(nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))}
codec := NewMessageCodec(nil, _options...)
written := make(chan struct{})
dispatchWg := new(sync.WaitGroup)
t.Cleanup(dispatchWg.Wait)
dispatchWg.Go(func() {
codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil)
codec.monitoredSALs <- readWriteModel.NewMonitoredSALShortFormBasicMode(
0,
0,
nil,
nil,
nil,
readWriteModel.ApplicationIdContainer_ACCESS_CONTROL_D5,
nil,
)
close(written)
})
t.Cleanup(func() {
<-written
})
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
fields.messageCodec = codec
},
manipulator: func(t *testing.T, connection *Connection) {
connection.SetConnected(true)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Connection{
messageCodec: tt.fields.messageCodec,
subscribers: tt.fields.subscribers,
tm: tt.fields.tm,
configuration: tt.fields.configuration,
driverContext: driverContextForTesting(),
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
log: testutils.ProduceTestingLogger(t),
}
c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...)
c.startSubscriptionHandler()
// To shut down properly we always do that
c.SetConnected(false)
c.handlerWaitGroup.Wait()
})
}
}
func TestNewConnection(t *testing.T) {
type args struct {
messageCodec *MessageCodec
configuration Configuration
driverContext DriverContext
tagHandler spi.PlcTagHandler
tm transactions.RequestTransactionManager
options map[string][]string
_options []options.WithOption
}
tests := []struct {
name string
args args
setup func(t *testing.T, args *args)
wantAssert func(*testing.T, *Connection) bool
}{
{
name: "just create the connection",
setup: func(t *testing.T, args *args) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
transport := test.NewTransport(_options...)
codec := NewMessageCodec(test.NewTransportInstance(transport, _options...), _options...)
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
args.messageCodec = codec
args._options = _options
},
wantAssert: func(t *testing.T, connection *Connection) bool {
return assert.NotNil(t, connection)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.args)
}
connection := NewConnection(tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options, tt.args._options...)
t.Cleanup(func() {
timer := time.NewTimer(1 * time.Second)
select {
case <-connection.Close():
case <-timer.C:
t.Error("timeout")
}
})
assert.True(t, tt.wantAssert(t, connection), "NewConnection(%v, %v, %v, %v, %v, %v)", tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)
})
}
}