blob: e55573ca944d5aa7843388cee70bfa88d7be2417 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package _default
import (
"context"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/utils"
)
// DefaultConnectionRequirements defines the required at a implementing connection when using DefaultConnection
// additional options can be set using the functions returning WithOption (e.g. WithDefaultTtl, WithPlcTagHandler...)
type DefaultConnectionRequirements interface {
// GetConnection should return the implementing connection when using DefaultConnection
GetConnection() plc4go.PlcConnection
// GetMessageCodec should return the spi.MessageCodec in use
GetMessageCodec() spi.MessageCodec
// ConnectWithContext is declared here for Connect redirection
ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult
}
// DefaultConnection should be used as an embedded struct. All defined methods here have default implementations
type DefaultConnection interface {
utils.Serializable
plc4go.PlcConnection
spi.TransportInstanceExposer
spi.HandlerExposer
SetConnected(connected bool)
GetTtl() time.Duration
}
// NewDefaultConnection is the factory for a DefaultConnection
func NewDefaultConnection(requirements DefaultConnectionRequirements, options ...options.WithOption) DefaultConnection {
return buildDefaultConnection(requirements, options...)
}
// WithDefaultTtl ttl is time.Second * 10 by default
func WithDefaultTtl(defaultTtl time.Duration) options.WithOption {
return withDefaultTtl{defaultTtl: defaultTtl}
}
func WithPlcTagHandler(tagHandler spi.PlcTagHandler) options.WithOption {
return withPlcTagHandler{plcTagHandler: tagHandler}
}
func WithPlcValueHandler(plcValueHandler spi.PlcValueHandler) options.WithOption {
return withPlcValueHandler{plcValueHandler: plcValueHandler}
}
///////////////////////////////////////
///////////////////////////////////////
//
// Internal section
//
type withDefaultTtl struct {
options.Option
// defaultTtl the time to live after a close
defaultTtl time.Duration
}
type withPlcTagHandler struct {
options.Option
plcTagHandler spi.PlcTagHandler
}
type withPlcValueHandler struct {
options.Option
plcValueHandler spi.PlcValueHandler
}
//go:generate go tool plc4xGenerator -type=defaultConnection
type defaultConnection struct {
DefaultConnectionRequirements `ignore:"true"`
// defaultTtl the time to live after a close
defaultTtl time.Duration
// connected indicates if a connection is connected
connected atomic.Bool
tagHandler spi.PlcTagHandler
valueHandler spi.PlcValueHandler
wg sync.WaitGroup // use to track spawned go routines
log zerolog.Logger
}
func buildDefaultConnection(requirements DefaultConnectionRequirements, _options ...options.WithOption) DefaultConnection {
defaultTtl := 10 * time.Second
var tagHandler spi.PlcTagHandler
var valueHandler spi.PlcValueHandler
for _, option := range _options {
switch option.(type) {
case withDefaultTtl:
defaultTtl = option.(withDefaultTtl).defaultTtl
case withPlcTagHandler:
tagHandler = option.(withPlcTagHandler).plcTagHandler
case withPlcValueHandler:
valueHandler = option.(withPlcValueHandler).plcValueHandler
}
}
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
return &defaultConnection{
DefaultConnectionRequirements: requirements,
defaultTtl: defaultTtl,
tagHandler: tagHandler,
valueHandler: valueHandler,
log: customLogger,
}
}
func (d *defaultConnection) SetConnected(connected bool) {
d.log.Trace().Bool("connected", connected).Msg("set connected")
d.connected.Store(connected)
}
func (d *defaultConnection) Connect() <-chan plc4go.PlcConnectionConnectResult {
return d.DefaultConnectionRequirements.ConnectWithContext(context.Background())
}
func (d *defaultConnection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
d.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
d.wg.Add(1)
go func() {
defer d.wg.Done()
defer func() {
if err := recover(); err != nil {
ch <- NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
err := d.GetMessageCodec().ConnectWithContext(ctx)
d.SetConnected(true)
connection := d.GetConnection()
ch <- NewDefaultPlcConnectionConnectResult(connection, err)
}()
return ch
}
func (d *defaultConnection) BlockingClose() {
d.log.Trace().Msg("blocking close connection")
closeResults := d.GetConnection().Close()
timeout := time.NewTimer(d.GetTtl())
d.SetConnected(false)
select {
case <-closeResults:
if !timeout.Stop() {
<-timeout.C
}
return
case <-timeout.C:
timeout.Stop()
return
}
}
func (d *defaultConnection) Close() <-chan plc4go.PlcConnectionCloseResult {
d.log.Trace().Msg("close connection")
if messageCodec := d.GetMessageCodec(); messageCodec != nil {
d.log.Trace().Msg("disconnecting message codec")
if err := messageCodec.Disconnect(); err != nil {
d.log.Warn().Err(err).Msg("Error disconnecting message code")
} else {
d.log.Trace().Msg("message codec disconnected")
}
}
var err error
if transportInstance := d.GetTransportInstance(); transportInstance != nil {
d.log.Trace().Msg("closing transport instance")
if err = transportInstance.Close(); err != nil {
d.log.Warn().Err(err).Msg("Error disconnecting transport instance")
} else {
d.log.Trace().Msg("transport instance closed")
}
}
d.SetConnected(false)
ch := make(chan plc4go.PlcConnectionCloseResult, 1)
ch <- NewDefaultPlcConnectionCloseResult(d.GetConnection(), err)
return ch
}
func (d *defaultConnection) IsConnected() bool {
// TODO: should we check here if the transport is connected?
return d.connected.Load()
}
func (d *defaultConnection) Ping() <-chan plc4go.PlcConnectionPingResult {
ch := make(chan plc4go.PlcConnectionPingResult, 1)
d.wg.Add(1)
go func() {
defer d.wg.Done()
defer func() {
if err := recover(); err != nil {
ch <- NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
if d.GetConnection().IsConnected() {
ch <- NewDefaultPlcConnectionPingResult(nil)
} else {
ch <- NewDefaultPlcConnectionPingResult(errors.New("not connected"))
}
}()
return ch
}
func (d *defaultConnection) GetTtl() time.Duration {
return d.defaultTtl
}
func (d *defaultConnection) GetMetadata() apiModel.PlcConnectionMetadata {
return &DefaultConnectionMetadata{
ConnectionAttributes: nil,
ProvidesReading: false,
ProvidesWriting: false,
ProvidesSubscribing: false,
ProvidesBrowsing: false,
}
}
func (d *defaultConnection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
panic("not provided by actual connection")
}
func (d *defaultConnection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
panic("not provided by actual connection")
}
func (d *defaultConnection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
panic("not provided by actual connection")
}
func (d *defaultConnection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
panic("not provided by actual connection")
}
func (d *defaultConnection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
panic("not provided by actual connection")
}
func (d *defaultConnection) GetTransportInstance() transports.TransportInstance {
if mc, ok := d.GetMessageCodec().(spi.TransportInstanceExposer); ok {
return mc.GetTransportInstance()
}
return nil
}
func (d *defaultConnection) GetPlcTagHandler() spi.PlcTagHandler {
return d.tagHandler
}
func (d *defaultConnection) GetPlcValueHandler() spi.PlcValueHandler {
return d.valueHandler
}
//
// Internal section
//
///////////////////////////////////////
///////////////////////////////////////