blob: 458206291bf0396e25a94f307e32aec4408072f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package interceptors
import (
"context"
"errors"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/rs/zerolog"
)
type ReaderExposer interface {
GetReader() spi.PlcReader
}
type WriterExposer interface {
GetWriter() spi.PlcWriter
}
type ReadRequestInterceptorExposer interface {
GetReadRequestInterceptor() ReadRequestInterceptor
}
type WriteRequestInterceptorExposer interface {
GetWriteRequestInterceptor() WriteRequestInterceptor
}
type readRequestFactory func(
tags map[string]apiModel.PlcTag,
tagNames []string,
reader spi.PlcReader,
readRequestInterceptor ReadRequestInterceptor,
) apiModel.PlcReadRequest
type writeRequestFactory func(
tags map[string]apiModel.PlcTag,
tagNames []string,
values map[string]values.PlcValue,
writer spi.PlcWriter,
writeRequestInterceptor WriteRequestInterceptor,
) apiModel.PlcWriteRequest
type readResponseFactory func(
request apiModel.PlcReadRequest,
responseCodes map[string]apiModel.PlcResponseCode,
values map[string]values.PlcValue,
) apiModel.PlcReadResponse
type writeResponseFactory func(
request apiModel.PlcWriteRequest,
responseCodes map[string]apiModel.PlcResponseCode,
) apiModel.PlcWriteResponse
type SingleItemRequestInterceptor struct {
readRequestFactory readRequestFactory
writeRequestFactory writeRequestFactory
readResponseFactory readResponseFactory
writeResponseFactory writeResponseFactory
log zerolog.Logger
}
func NewSingleItemRequestInterceptor(readRequestFactory readRequestFactory, writeRequestFactory writeRequestFactory, readResponseFactory readResponseFactory, writeResponseFactory writeResponseFactory, _options ...options.WithOption) SingleItemRequestInterceptor {
return SingleItemRequestInterceptor{
readRequestFactory: readRequestFactory,
writeRequestFactory: writeRequestFactory,
readResponseFactory: readResponseFactory,
writeResponseFactory: writeResponseFactory,
log: options.ExtractCustomLogger(_options...),
}
}
///////////////////////////////////////
///////////////////////////////////////
//
// Internal section
//
type interceptedPlcReadRequestResult struct {
Request apiModel.PlcReadRequest
Response apiModel.PlcReadResponse
Err error
}
func (d *interceptedPlcReadRequestResult) GetRequest() apiModel.PlcReadRequest {
return d.Request
}
func (d *interceptedPlcReadRequestResult) GetResponse() apiModel.PlcReadResponse {
return d.Response
}
func (d *interceptedPlcReadRequestResult) GetErr() error {
return d.Err
}
type interceptedPlcWriteRequestResult struct {
Request apiModel.PlcWriteRequest
Response apiModel.PlcWriteResponse
Err error
}
func (d *interceptedPlcWriteRequestResult) GetRequest() apiModel.PlcWriteRequest {
return d.Request
}
func (d *interceptedPlcWriteRequestResult) GetResponse() apiModel.PlcWriteResponse {
return d.Response
}
func (d *interceptedPlcWriteRequestResult) GetErr() error {
return d.Err
}
//
// Internal section
//
///////////////////////////////////////
///////////////////////////////////////
func (m SingleItemRequestInterceptor) InterceptReadRequest(ctx context.Context, readRequest apiModel.PlcReadRequest) []apiModel.PlcReadRequest {
if readRequest == nil || len(readRequest.GetTagNames()) == 0 {
return nil
}
// If this request just has one tag, go the shortcut
if len(readRequest.GetTagNames()) == 1 {
m.log.Debug().Msg("We got only one request, no splitting required")
return []apiModel.PlcReadRequest{readRequest}
}
m.log.Trace().Msg("Splitting requests")
// In all other cases, create a new read request containing only one item
var readRequests []apiModel.PlcReadRequest
for _, tagName := range readRequest.GetTagNames() {
if err := ctx.Err(); err != nil {
m.log.Warn().Err(err).Msg("aborting early")
return nil
}
m.log.Debug().Str("tagName", tagName).Msg("Splitting into own request")
tag := readRequest.GetTag(tagName)
subReadRequest := m.readRequestFactory(
map[string]apiModel.PlcTag{tagName: tag},
[]string{tagName},
readRequest.(ReaderExposer).GetReader(),
readRequest.(ReadRequestInterceptorExposer).GetReadRequestInterceptor(),
)
readRequests = append(readRequests, subReadRequest)
}
return readRequests
}
func (m SingleItemRequestInterceptor) ProcessReadResponses(ctx context.Context, readRequest apiModel.PlcReadRequest, readResults []apiModel.PlcReadRequestResult) apiModel.PlcReadRequestResult {
if len(readResults) == 1 {
m.log.Debug().Msg("We got only one response, no merging required")
return readResults[0]
}
m.log.Trace().Msg("Merging requests")
responseCodes := map[string]apiModel.PlcResponseCode{}
val := map[string]values.PlcValue{}
var err error = nil
for _, readResult := range readResults {
if ctxErr := ctx.Err(); ctxErr != nil {
m.log.Warn().Err(ctxErr).Msg("aborting early")
if err != nil {
multiError := err.(utils.MultiError)
multiError.Errors = append(multiError.Errors, ctxErr)
} else {
err = ctxErr
}
break
}
if readResult.GetErr() != nil {
m.log.Debug().Err(readResult.GetErr()).Msgf("Error during read")
if err == nil {
// Lazy initialization of multi error
err = utils.MultiError{MainError: errors.New("while aggregating results"), Errors: []error{readResult.GetErr()}}
} else {
multiError := err.(utils.MultiError)
multiError.Errors = append(multiError.Errors, readResult.GetErr())
}
} else if response := readResult.GetResponse(); response != nil {
request := response.GetRequest()
if len(request.GetTagNames()) > 1 {
m.log.Error().Int("numberOfTags", len(request.GetTagNames())).Msg("We should only get 1")
}
for _, tagName := range request.GetTagNames() {
responseCodes[tagName] = response.GetResponseCode(tagName)
val[tagName] = response.GetValue(tagName)
}
}
}
return &interceptedPlcReadRequestResult{
Request: readRequest,
Response: m.readResponseFactory(readRequest, responseCodes, val),
Err: err,
}
}
func (m SingleItemRequestInterceptor) InterceptWriteRequest(ctx context.Context, writeRequest apiModel.PlcWriteRequest) []apiModel.PlcWriteRequest {
if writeRequest == nil {
return nil
}
// If this request just has one tag, go the shortcut
if len(writeRequest.GetTagNames()) == 1 {
m.log.Debug().Msg("We got only one request, no splitting required")
return []apiModel.PlcWriteRequest{writeRequest}
}
m.log.Trace().Msg("Splitting requests")
// In all other cases, create a new write request containing only one item
var writeRequests []apiModel.PlcWriteRequest
for _, tagName := range writeRequest.GetTagNames() {
if err := ctx.Err(); err != nil {
m.log.Warn().Err(err).Msg("aborting early")
return nil
}
m.log.Debug().Str("tagName", tagName).Msg("Splitting into own request")
tag := writeRequest.GetTag(tagName)
subWriteRequest := m.writeRequestFactory(
map[string]apiModel.PlcTag{tagName: tag},
[]string{tagName},
map[string]values.PlcValue{tagName: writeRequest.GetValue(tagName)},
writeRequest.(WriterExposer).GetWriter(),
writeRequest.(WriteRequestInterceptorExposer).GetWriteRequestInterceptor(),
)
writeRequests = append(writeRequests, subWriteRequest)
}
return writeRequests
}
func (m SingleItemRequestInterceptor) ProcessWriteResponses(ctx context.Context, writeRequest apiModel.PlcWriteRequest, writeResults []apiModel.PlcWriteRequestResult) apiModel.PlcWriteRequestResult {
if len(writeResults) == 1 {
m.log.Debug().Msg("We got only one response, no merging required")
return writeResults[0]
}
m.log.Trace().Msg("Merging requests")
responseCodes := map[string]apiModel.PlcResponseCode{}
var err error = nil
for _, writeResult := range writeResults {
if ctxErr := ctx.Err(); ctxErr != nil {
m.log.Warn().Err(ctxErr).Msg("aborting early")
if err != nil {
multiError := err.(utils.MultiError)
multiError.Errors = append(multiError.Errors, ctxErr)
} else {
err = ctxErr
}
break
}
if writeResult.GetErr() != nil {
m.log.Debug().Err(writeResult.GetErr()).Msgf("Error during write")
if err == nil {
// Lazy initialization of multi error
err = utils.MultiError{MainError: errors.New("while aggregating results"), Errors: []error{writeResult.GetErr()}}
} else {
multiError := err.(utils.MultiError)
multiError.Errors = append(multiError.Errors, writeResult.GetErr())
}
} else if writeResult.GetResponse() != nil {
if len(writeResult.GetResponse().GetRequest().GetTagNames()) > 1 {
m.log.Error().Int("numberOfTags", len(writeResult.GetResponse().GetRequest().GetTagNames())).Msg("We should only get 1")
}
for _, tagName := range writeResult.GetResponse().GetRequest().GetTagNames() {
responseCodes[tagName] = writeResult.GetResponse().GetResponseCode(tagName)
}
}
}
return &interceptedPlcWriteRequestResult{
Request: writeRequest,
Response: m.writeResponseFactory(writeRequest, responseCodes),
Err: err,
}
}