blob: 8ab5f0ffaef1cc0d4622c5f77cce34281295df0b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package ads
import (
"context"
stdErrors "errors"
"runtime/debug"
"time"
"github.com/pkg/errors"
dirverModel "github.com/apache/plc4x/plc4go/internal/ads/model"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/utils"
)
func (m *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
return spiModel.NewDefaultPlcSubscriptionRequestBuilder(m.GetPlcTagHandler(), m.GetPlcValueHandler(), m)
}
func (m *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
// TODO: Implement this ...
return nil
}
func (m *Connection) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
defaultSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
// Subscription requests are unfortunately now supported as multi-item requests,
// so we have to send one subscription request for every tag.
// Prepare the subscription requests, by ensuring any symbolic tags are correctly resolved
// and splitting them up into single-tag requests.
subSubscriptionRequests := map[string]apiModel.PlcSubscriptionRequest{}
subResults := map[string]apiModel.PlcSubscriptionRequestResult{}
// Iterate over all requests and add the result-channels to the list
for _, tagName := range subscriptionRequest.GetTagNames() {
tag := subscriptionRequest.GetTag(tagName)
var directTag dirverModel.DirectPlcTag
switch tag.(type) {
case dirverModel.DirectPlcTag:
directTag = tag.(dirverModel.DirectPlcTag)
case dirverModel.SymbolicPlcTag:
symbolicTag := tag.(dirverModel.SymbolicPlcTag)
directTagPtr, err := m.driverContext.getDirectTagForSymbolTag(symbolicTag)
if err != nil {
subResults[tagName] = spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, errors.Wrap(err, "error resolving symbolic tag"))
continue
}
directTag = *directTagPtr
default:
subResults[tagName] = spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, errors.New("invalid tag type"))
continue
}
subscriptionType := defaultSubscriptionRequest.GetType(tagName)
interval := defaultSubscriptionRequest.GetInterval(tagName)
preRegisteredConsumers := defaultSubscriptionRequest.GetPreRegisteredConsumers(tagName)
subSubscriptionRequests[tagName] = spiModel.NewDefaultPlcSubscriptionRequest(
m,
[]string{tagName},
map[string]apiModel.PlcSubscriptionTag{tagName: directTag},
map[string]apiModel.PlcSubscriptionType{tagName: subscriptionType},
map[string]time.Duration{tagName: interval},
map[string][]apiModel.PlcSubscriptionEventConsumer{tagName: preRegisteredConsumers},
)
}
// If this is a single item request, we can take a shortcut.
if len(subSubscriptionRequests) == 1 {
tagName := subscriptionRequest.GetTagNames()[0]
subSubscriptionRequest := subSubscriptionRequests[tagName]
return m.subscribe(ctx, subSubscriptionRequest)
}
// Create a sub-result-channel map and prepare execute the subscriptions.
subResultChannels := map[string]<-chan apiModel.PlcSubscriptionRequestResult{}
for tagName, subSubscriptionRequest := range subSubscriptionRequests {
subResultChannels[tagName] = m.subscribe(ctx, subSubscriptionRequest)
}
// Create a new result-channel, which completes as soon as all sub-result-channels have returned
globalResultChannel := make(chan apiModel.PlcSubscriptionRequestResult, 1)
m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
Str("stack", string(debug.Stack())).
Interface("err", err).
Msg("panic-ed")
}
}()
// Iterate over all sub-results
for _, subResultChannel := range subResultChannels {
select {
case <-ctx.Done():
globalResultChannel <- spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, ctx.Err())
return
case subResult := <-subResultChannel:
// These are all single value requests ... so it's safe to assume this shortcut.
tagName := subResult.GetRequest().GetTagNames()[0]
subResults[tagName] = subResult
}
}
// As soon as all are done, process the results
result := m.processSubscriptionResponses(ctx, subscriptionRequest, subResults)
// Return the final result
globalResultChannel <- result
})
return globalResultChannel
}
func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
responseChan := make(chan apiModel.PlcSubscriptionRequestResult, 1)
m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
responseChan <- spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
// At this point we are sure to only have single item direct tag requests.
tagName := subscriptionRequest.GetTagNames()[0]
directTag := subscriptionRequest.GetTag(tagName).(dirverModel.DirectPlcTag)
if directTag.DataType == nil {
directTag.DataType = m.driverContext.dataTypeTable[directTag.ValueType.String()]
}
response, err := m.ExecuteAdsAddDeviceNotificationRequest(ctx, directTag.IndexGroup, directTag.IndexOffset, directTag.DataType.GetSize(), model.AdsTransMode_ON_CHANGE, 0, 0)
if err != nil {
responseChan <- spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
nil,
err,
)
}
// Create a new subscription handle.
subscriptionHandle := dirverModel.NewAdsSubscriptionHandle(
m,
tagName,
directTag,
append(m._options, options.WithCustomLogger(m.log))...,
)
responseChan <- spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
spiModel.NewDefaultPlcSubscriptionResponse(
subscriptionRequest,
map[string]apiModel.PlcResponseCode{tagName: apiModel.PlcResponseCode_OK},
map[string]apiModel.PlcSubscriptionHandle{tagName: subscriptionHandle},
append(m._options, options.WithCustomLogger(m.log))...,
),
nil,
)
// Store it together with the returned ADS handle.
m.subscriptions[response.GetNotificationHandle()] = subscriptionHandle
})
return responseChan
}
func (m *Connection) processSubscriptionResponses(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest, subscriptionResults map[string]apiModel.PlcSubscriptionRequestResult) apiModel.PlcSubscriptionRequestResult {
if len(subscriptionResults) == 1 {
m.log.Debug().Msg("We got only one response, no merging required")
for tagName := range subscriptionResults {
return subscriptionResults[tagName]
}
}
m.log.Trace().Msg("Merging requests")
responseCodes := map[string]apiModel.PlcResponseCode{}
subscriptionHandles := map[string]apiModel.PlcSubscriptionHandle{}
var collectedErrors []error
for _, subscriptionResult := range subscriptionResults {
if subErr := subscriptionResult.GetErr(); subErr != nil {
m.log.Debug().Err(subErr).Msg("Error during subscription")
collectedErrors = append(collectedErrors, subErr)
} else if response := subscriptionResult.GetResponse(); response != nil {
request := response.GetRequest()
tagNames := request.GetTagNames()
if len(tagNames) > 1 {
m.log.Error().Int("numberOfTags", len(tagNames)).Msg("We should only get 1")
}
for _, tagName := range tagNames {
handle, err := response.GetSubscriptionHandle(tagName)
if err != nil {
collectedErrors = append(collectedErrors, err)
responseCodes[tagName] = apiModel.PlcResponseCode_REMOTE_ERROR
subscriptionHandles[tagName] = nil
} else {
responseCodes[tagName] = response.GetResponseCode(tagName)
subscriptionHandles[tagName] = handle
}
}
}
}
var errResult error
if err := stdErrors.Join(collectedErrors...); err != nil {
errResult = errors.Wrap(err, "while aggregating results")
}
return spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
spiModel.NewDefaultPlcSubscriptionResponse(
subscriptionRequest,
responseCodes,
subscriptionHandles,
append(m._options, options.WithCustomLogger(m.log))...,
),
errResult,
)
}
func (m *Connection) handleIncomingDeviceNotificationRequest(deviceNotificationRequest model.AdsDeviceNotificationRequest) {
for _, stampHeader := range deviceNotificationRequest.GetAdsStampHeaders() {
for _, sample := range stampHeader.GetAdsNotificationSamples() {
notificationHandle := sample.GetNotificationHandle()
subscriptionHandler, ok := m.subscriptions[notificationHandle]
if !ok {
continue
}
adsSubscriptionHandler, ok := subscriptionHandler.(*dirverModel.AdsSubscriptionHandle)
if !ok {
continue
}
// If no one is consuming, we don't need to waste effort in parsing.
if adsSubscriptionHandler.GetNumConsumers() == 0 {
continue
}
// Create a readBuffer containing the sample data
readBuffer := utils.NewReadBufferByteBased(sample.GetData())
// Parse the data according to theƏ type data stored in the tag
directTag := adsSubscriptionHandler.GetDirectTag()
plcValue, err := m.parsePlcValue(directTag.DataType, directTag.DataType.GetArrayInfo(), readBuffer)
if err != nil {
continue
}
// Publish the parsed value to the subscribers.
adsSubscriptionHandler.PublishPlcValue(plcValue)
}
}
}
func (m *Connection) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
return nil
}
func (m *Connection) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
return nil
}
func (m *Connection) Unregister(registration apiModel.PlcConsumerRegistration) {
}