| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package opcua |
| |
| import ( |
| "context" |
| "sync" |
| "time" |
| |
| "github.com/pkg/errors" |
| "github.com/rs/zerolog" |
| |
| "github.com/apache/plc4x/plc4go/pkg/api" |
| apiModel "github.com/apache/plc4x/plc4go/pkg/api/model" |
| "github.com/apache/plc4x/plc4go/spi" |
| "github.com/apache/plc4x/plc4go/spi/default" |
| spiModel "github.com/apache/plc4x/plc4go/spi/model" |
| "github.com/apache/plc4x/plc4go/spi/options" |
| "github.com/apache/plc4x/plc4go/spi/tracer" |
| ) |
| |
| //go:generate go tool plc4xGenerator -type=Connection |
| type Connection struct { |
| _default.DefaultConnection |
| messageCodec *MessageCodec |
| subscribers []*Subscriber |
| |
| configuration Configuration `stringer:"true"` |
| driverContext DriverContext `stringer:"true"` |
| |
| channel *SecureChannel |
| |
| connectEvent chan struct{} |
| connectTimeout time.Duration // TODO: do we need to have that in general, where to get that from |
| disconnectEvent chan struct{} |
| disconnectTimeout time.Duration // TODO: do we need to have that in general, where to get that from |
| |
| connectionId string |
| tracer tracer.Tracer |
| |
| wg sync.WaitGroup // use to track spawned go routines |
| |
| log zerolog.Logger `ignore:"true"` |
| _options []options.WithOption `ignore:"true"` // Used to pass them downstream |
| } |
| |
| func NewConnection(messageCodec *MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, connectionOptions map[string][]string, _options ...options.WithOption) *Connection { |
| customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) |
| connection := &Connection{ |
| messageCodec: messageCodec, |
| configuration: configuration, |
| driverContext: driverContext, |
| channel: NewSecureChannel(customLogger, driverContext, configuration), |
| connectEvent: make(chan struct{}), |
| connectTimeout: 5 * time.Second, |
| disconnectEvent: make(chan struct{}), |
| disconnectTimeout: 5 * time.Second, |
| log: customLogger, |
| _options: _options, |
| } |
| if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok { |
| if len(traceEnabledOption) == 1 { |
| connection.tracer = tracer.NewTracer(connection.connectionId, _options...) |
| } |
| } |
| connection.DefaultConnection = _default.NewDefaultConnection( |
| connection, |
| append(_options, |
| _default.WithPlcTagHandler(tagHandler), |
| )..., |
| ) |
| return connection |
| } |
| |
| func (c *Connection) GetConnectionId() string { |
| return c.connectionId |
| } |
| |
| func (c *Connection) IsTraceEnabled() bool { |
| return c.tracer != nil |
| } |
| |
| func (c *Connection) GetTracer() tracer.Tracer { |
| return c.tracer |
| } |
| |
| func (c *Connection) GetConnection() plc4go.PlcConnection { |
| return c |
| } |
| |
| func (c *Connection) GetMessageCodec() spi.MessageCodec { |
| return c.messageCodec |
| } |
| |
| func (c *Connection) Connect(ctx context.Context) error { |
| c.log.Trace().Msg("Connecting") |
| c.log.Trace().Msg("connecting codec") |
| if err := c.messageCodec.Connect(ctx); err != nil { |
| return errors.Wrap(err, "Error connecting codec") |
| } |
| |
| if c.driverContext.fireDiscoverEvent { |
| c.log.Trace().Msg("calling onDiscover") |
| c.channel.onDiscover(ctx, c.messageCodec) |
| } else { |
| c.log.Trace().Msg("we don't wait for session discover") |
| } |
| |
| // For testing purposes we can skip the waiting for a complete connection |
| if !c.driverContext.awaitSetupComplete { |
| c.wg.Go(func() { |
| if err := c.setupConnection(ctx); err != nil { |
| c.log.Error().Err(err).Msg("Error during setup") |
| } |
| }) |
| c.log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!") |
| // Here we write directly and don't wait till the connection is "really" connected |
| // Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete |
| c.SetConnected(true) |
| return nil |
| } |
| |
| if err := c.setupConnection(ctx); err != nil { |
| return errors.Wrap(err, "Error setting up connection") |
| } |
| return nil |
| } |
| |
| func (c *Connection) Close() error { |
| ctx := context.TODO() |
| ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) |
| defer cancelFunc() |
| |
| c.channel.onDisconnect(ctx, c) |
| disconnectTimeout := time.NewTimer(c.disconnectTimeout) |
| select { |
| case <-c.disconnectEvent: |
| c.log.Info().Msg("disconnected") |
| case <-disconnectTimeout.C: |
| return errors.Errorf("timeout after %s", c.disconnectTimeout) |
| } |
| return nil |
| } |
| |
| func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata { |
| return &_default.DefaultConnectionMetadata{ |
| ProvidesReading: true, |
| ProvidesWriting: true, |
| ProvidesSubscribing: true, |
| ProvidesBrowsing: false, |
| } |
| } |
| |
| func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder { |
| return spiModel.NewDefaultPlcReadRequestBuilder( |
| c.GetPlcTagHandler(), |
| NewReader( |
| c, |
| append(c._options, options.WithCustomLogger(c.log))..., |
| ), |
| ) |
| } |
| |
| func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder { |
| return spiModel.NewDefaultPlcWriteRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewWriter(c)) |
| } |
| |
| func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder { |
| return spiModel.NewDefaultPlcSubscriptionRequestBuilder( |
| c.GetPlcTagHandler(), |
| c.GetPlcValueHandler(), |
| NewSubscriber( |
| c.addSubscriber, |
| c, |
| append(c._options, options.WithCustomLogger(c.log))..., |
| ), |
| ) |
| } |
| |
| func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder { |
| // TODO: where do we get the unsubscriber from |
| return nil |
| } |
| |
| func (c *Connection) addSubscriber(subscriber *Subscriber) { |
| for _, sub := range c.subscribers { |
| if sub == subscriber { |
| c.log.Debug().Stringer("subscriber", subscriber).Msg("Subscriber already added") |
| return |
| } |
| } |
| c.subscribers = append(c.subscribers, subscriber) |
| } |
| |
| func (c *Connection) setupConnection(ctx context.Context) error { |
| c.log.Trace().Msg("setup connection") |
| |
| c.log.Debug().Msg("Opcua Driver running in ACTIVE mode.") |
| if err := c.channel.onConnect(ctx, c); err != nil { |
| return errors.Wrap(err, "Error during connection setup") |
| } |
| |
| connectTimeout := time.NewTimer(c.connectTimeout) |
| select { |
| case <-c.connectEvent: |
| c.log.Info().Msg("connected") |
| c.SetConnected(true) |
| c.log.Trace().Msg("Connect fired") |
| case <-connectTimeout.C: |
| return errors.Errorf("timeout after %s", c.connectTimeout) |
| } |
| |
| c.log.Trace().Msg("connection setup done") |
| return nil |
| } |
| |
| func (c *Connection) fireConnectionError(err error, ch chan<- error) { |
| c.log.Trace().Err(err).Msg("fire connection error") |
| if c.driverContext.awaitSetupComplete { |
| ch <- errors.Wrap(err, "Error during connection") |
| } else { |
| c.log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect") |
| } |
| if err := c.messageCodec.Disconnect(); err != nil { |
| c.log.Debug().Err(err).Msg("Error disconnecting message codec on connection error") |
| } |
| c.SetConnected(false) |
| select { |
| case c.disconnectEvent <- struct{}{}: |
| default: |
| } |
| } |
| |
| func (c *Connection) fireConnected(ch chan<- struct{}) { |
| c.log.Trace().Msg("fire connected") |
| if c.driverContext.awaitSetupComplete { |
| ch <- struct{}{} |
| } else { |
| c.log.Info().Msg("Successfully connected") |
| } |
| c.SetConnected(true) |
| select { |
| case c.connectEvent <- struct{}{}: |
| default: |
| } |
| } |