blob: 5915360b8643ea76f2240ebbbf273442068c7bc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package simulated
import (
"context"
"runtime/debug"
"strconv"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/apache/plc4x/plc4go/spi/utils"
)
type Connection struct {
device *Device
tagHandler spi.PlcTagHandler
valueHandler spi.PlcValueHandler
options map[string][]string
connected bool
connectionId string
tracer tracer.Tracer
wg sync.WaitGroup // use to track spawned go routines
log zerolog.Logger
}
func NewConnection(device *Device, tagHandler spi.PlcTagHandler, valueHandler spi.PlcValueHandler, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
connection := &Connection{
device: device,
tagHandler: tagHandler,
valueHandler: valueHandler,
options: connectionOptions,
connected: false,
connectionId: utils.GenerateId(customLogger, 4),
log: customLogger,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
connection.tracer = tracer.NewTracer(connection.connectionId, _options...)
}
}
return connection
}
func (c *Connection) GetConnectionId() string {
return c.connectionId
}
func (c *Connection) IsTraceEnabled() bool {
return c.tracer != nil
}
func (c *Connection) GetTracer() tracer.Tracer {
return c.tracer
}
func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
return c.ConnectWithContext(context.Background())
}
func (c *Connection) ConnectWithContext(_ context.Context) <-chan plc4go.PlcConnectionConnectResult {
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
// Check if the connection was already connected
if c.connected {
if c.tracer != nil {
c.tracer.AddTrace("connect", "error: already connected")
}
// Return an error to the user.
ch <- _default.NewDefaultPlcConnectionConnectResult(c, errors.New("already connected"))
return
}
var txId string
if c.tracer != nil {
txId = c.tracer.AddTransactionalStartTrace("connect", "started")
}
if delayString, ok := c.options["connectionDelay"]; ok {
// This is the length of the array, not the string
if len(delayString) == 1 {
delay, err := strconv.Atoi(delayString[0])
if err == nil {
time.Sleep(time.Duration(delay) * time.Millisecond)
}
}
}
// If we want the connection to fail, do so, otherwise return the connection.
if errorString, ok := c.options["connectionError"]; ok {
// If the ping operation should fail with an error, do so.
if len(errorString) == 1 {
ch <- _default.NewDefaultPlcConnectionConnectResult(c, errors.New(errorString[0]))
}
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "connect", "error: "+errorString[0])
}
} else {
// Mark the connection as "connected"
c.connected = true
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "connect", "success")
}
// Return the connection in a connected state to the user.
ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
}
})
return ch
}
func (c *Connection) BlockingClose() {
<-c.Close()
}
func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
ch := make(chan plc4go.PlcConnectionCloseResult, 1)
c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
// Check if the connection is connected.
if !c.connected {
if c.tracer != nil {
c.tracer.AddTrace("close", "error: not connected")
}
// Return an error to the user.
ch <- _default.NewDefaultPlcConnectionCloseResult(c, errors.New("not connected"))
return
}
var txId string
if c.tracer != nil {
txId = c.tracer.AddTransactionalStartTrace("close", "started")
}
// If a delay was configured, wait for the pre-configured time.
if delayString, ok := c.options["closingDelay"]; ok {
// This is the length of the array, not the string
if len(delayString) == 1 {
delay, err := strconv.Atoi(delayString[0])
if err == nil {
time.Sleep(time.Duration(delay) * time.Millisecond)
}
}
}
// Mark the connection as "disconnected".
c.connected = false
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "close", "success")
}
// Return a new connection to the user.
ch <- _default.NewDefaultPlcConnectionCloseResult(c, nil)
})
return ch
}
func (c *Connection) IsConnected() bool {
return c.connected
}
func (c *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
ch := make(chan plc4go.PlcConnectionPingResult, 1)
c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <- _default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
// Check if the connection is connected
if !c.connected {
if c.tracer != nil {
c.tracer.AddTrace("ping", "error: not connected")
}
// Return an error to the user.
ch <- _default.NewDefaultPlcConnectionPingResult(errors.New("not connected"))
return
}
var txId string
if c.tracer != nil {
txId = c.tracer.AddTransactionalStartTrace("ping", "started")
}
if delayString, ok := c.options["pingDelay"]; ok {
// This is the length of the array, not the string
if len(delayString) == 1 {
delay, err := strconv.Atoi(delayString[0])
if err == nil {
time.Sleep(time.Duration(delay) * time.Millisecond)
}
}
}
if errorString, ok := c.options["pingError"]; ok {
// If the ping operation should fail with an error, do so.
if len(errorString) == 1 {
ch <- _default.NewDefaultPlcConnectionPingResult(errors.New(errorString[0]))
}
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "ping", "error: "+errorString[0])
}
} else {
// Otherwise, give a positive response.
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "ping", "success")
}
ch <- _default.NewDefaultPlcConnectionPingResult(nil)
}
})
return ch
}
func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
return &_default.DefaultConnectionMetadata{
ConnectionAttributes: map[string]string{
"connectionDelay": "Delay applied when connecting",
"closingDelay": "Delay applied when closing the connection",
"pingDelay": "Delay applied when executing a ping operation",
"readDelay": "Delay applied when executing a read operation",
"writeDelay": "Delay applied when executing a write operation",
},
ProvidesReading: true,
ProvidesWriting: true,
ProvidesSubscribing: false,
ProvidesBrowsing: false,
}
}
func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
return spiModel.NewDefaultPlcReadRequestBuilder(c.tagHandler, NewReader(c.device, c.options, c.tracer))
}
func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
return spiModel.NewDefaultPlcWriteRequestBuilder(c.tagHandler, c.valueHandler, NewWriter(c.device, c.options, c.tracer))
}
func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.tagHandler, c.valueHandler, NewSubscriber(c.device, c.options, c.tracer))
}
func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
panic("not provided by simulated connection")
}
func (c *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
panic("not provided by simulated connection")
}
func (c *Connection) String() string {
return "simulatedConnection"
}