blob: cbc2634694d1691818e32d5b2f873f467c876f37 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package knxnetip
import (
"context"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/utils"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
)
type Reader struct {
connection *Connection
wg sync.WaitGroup // use to track spawned go routines
log zerolog.Logger
}
func NewReader(connection *Connection, _options ...options.WithOption) *Reader {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
return &Reader{
connection: connection,
log: customLogger,
}
}
func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
// TODO: handle ctx
resultChan := make(chan apiModel.PlcReadRequestResult, 1)
m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
resultChan <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
responseCodes := map[string]apiModel.PlcResponseCode{}
plcValues := map[string]apiValues.PlcValue{}
// Sort the tags in direct properties and memory addresses, which will have to be actively
// read from the devices and group-addresses which will be locally processed from the local cache.
deviceAddresses := map[driverModel.KnxAddress]map[string]DeviceTag{}
groupAddresses := map[string]GroupAddressTag{}
for _, tagName := range readRequest.GetTagNames() {
// Get the knx knxTag
knxTag, err := CastToKnxTagFromPlcTag(readRequest.GetTag(tagName))
if err != nil {
responseCodes[tagName] = apiModel.PlcResponseCode_INVALID_ADDRESS
plcValues[tagName] = nil
continue
}
switch knxTag.(type) {
case DevicePropertyAddressPlcTag:
propertyTag := knxTag.(DevicePropertyAddressPlcTag)
knxAddress := propertyTag.toKnxAddress()
if knxAddress == nil {
continue
}
if _, ok := deviceAddresses[knxAddress]; !ok {
deviceAddresses[knxAddress] = map[string]DeviceTag{}
}
deviceAddresses[knxAddress][tagName] = propertyTag
case DeviceMemoryAddressPlcTag:
memoryTag := knxTag.(DeviceMemoryAddressPlcTag)
knxAddress := memoryTag.toKnxAddress()
if knxAddress == nil {
continue
}
if _, ok := deviceAddresses[knxAddress]; !ok {
deviceAddresses[knxAddress] = map[string]DeviceTag{}
}
deviceAddresses[knxAddress][tagName] = memoryTag
case GroupAddressTag:
groupAddressTag := knxTag.(GroupAddressTag)
groupAddresses[tagName] = groupAddressTag
default:
responseCodes[tagName] = apiModel.PlcResponseCode_INVALID_ADDRESS
plcValues[tagName] = nil
}
}
// Process the direct properties.
// Connect to each knx device and read all of the properties on that particular device.
for deviceAddress, tags := range deviceAddresses {
// Collect all the properties on this device
for tagName, tag := range tags {
switch tag.(type) {
case DevicePropertyAddressPlcTag:
propertyTag := tag.(DevicePropertyAddressPlcTag)
timeout := time.NewTimer(m.connection.defaultTtl)
results := m.connection.DeviceReadProperty(ctx, deviceAddress, propertyTag.ObjectId, propertyTag.PropertyId, propertyTag.PropertyIndex, propertyTag.NumElements)
select {
case result := <-results:
if !timeout.Stop() {
<-timeout.C
}
if result.err == nil {
responseCodes[tagName] = apiModel.PlcResponseCode_OK
plcValues[tagName] = result.value
} else {
responseCodes[tagName] = apiModel.PlcResponseCode_INTERNAL_ERROR
plcValues[tagName] = nil
}
case <-timeout.C:
timeout.Stop()
responseCodes[tagName] = apiModel.PlcResponseCode_REMOTE_BUSY
plcValues[tagName] = nil
}
case DeviceMemoryAddressPlcTag:
timeout := time.NewTimer(m.connection.defaultTtl)
memoryTag := tag.(DeviceMemoryAddressPlcTag)
results := m.connection.DeviceReadMemory(ctx, deviceAddress, memoryTag.Address, memoryTag.NumElements, memoryTag.TagType)
select {
case result := <-results:
if !timeout.Stop() {
<-timeout.C
}
if result.err == nil {
responseCodes[tagName] = apiModel.PlcResponseCode_OK
plcValues[tagName] = result.value
} else {
responseCodes[tagName] = apiModel.PlcResponseCode_INTERNAL_ERROR
plcValues[tagName] = nil
}
case <-timeout.C:
timeout.Stop()
responseCodes[tagName] = apiModel.PlcResponseCode_REMOTE_BUSY
plcValues[tagName] = nil
}
}
}
}
// Get the group address values from the cache
for tagName, tag := range groupAddresses {
responseCode, plcValue := m.readGroupAddress(ctx, tag)
responseCodes[tagName] = responseCode
plcValues[tagName] = plcValue
}
// Assemble the results
result := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
resultChan <- spiModel.NewDefaultPlcReadRequestResult(
readRequest,
result,
nil,
)
})
return resultChan
}
func (m *Reader) readGroupAddress(ctx context.Context, tag GroupAddressTag) (apiModel.PlcResponseCode, apiValues.PlcValue) {
rawAddresses, err := m.resolveAddresses(tag)
if err != nil {
m.log.Debug().Err(err).Msg("error resolving addresses")
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
// First resolve any pattern to a list of fully qualified group addresses.
// Then check if any of them is available in the local value cache,
// if not send a group address read-request.
values := map[string]apiValues.PlcValue{}
returnCodes := map[string]apiModel.PlcResponseCode{}
for _, numericAddress := range rawAddresses {
// Create a string representation of this numeric address depending on the type of requested address
stringAddress, err := NumericGroupAddressToString(numericAddress, tag)
if err != nil {
m.log.Debug().Err(err).Msg("error mapping addresses")
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
// Try to get a value from the cache
m.connection.valueCacheMutex.RLock()
int8s, ok := m.connection.valueCache[numericAddress]
m.connection.valueCacheMutex.RUnlock()
// If nothing was found in the cache, try to execute a group address read,
// Otherwise respond with values from the cache.
if !ok {
addr := []byte{byte(numericAddress >> 8), byte(numericAddress & 0xFF)}
rrc := m.connection.ReadGroupAddress(ctx, addr, tag.GetTagType())
select {
case readResult := <-rrc:
if readResult.value != nil {
if readResult.err == nil {
returnCodes[stringAddress] = apiModel.PlcResponseCode_OK
values[stringAddress] = readResult.value
} else {
returnCodes[stringAddress] = apiModel.PlcResponseCode_INTERNAL_ERROR
values[stringAddress] = nil
}
} else {
returnCodes[stringAddress] = apiModel.PlcResponseCode_NOT_FOUND
values[stringAddress] = nil
}
// TODO: Do we need a "default" case here?
}
} else {
// If we don't have any tag-type information, add the raw data
if tag.GetTagType() == nil {
values[stringAddress] = spiValues.NewPlcRawByteArray(int8s)
} else {
// Decode the data according to the tags type
rb := utils.NewReadBufferByteBased(int8s)
if tag.GetTagType() == nil {
return apiModel.PlcResponseCode_INVALID_DATATYPE, nil
}
// If the size of the tag is greater than 6, we have to skip the first byte
if tag.GetTagType().GetLengthInBits(context.Background()) > 6 {
_, _ = rb.ReadUint8("tagType", 8)
}
plcValue, err := driverModel.KnxDatapointParseWithBuffer(context.Background(), rb, *tag.GetTagType())
// If any of the values doesn't decode correctly, we can't return any
if err != nil {
return apiModel.PlcResponseCode_INVALID_DATA, nil
}
values[stringAddress] = plcValue
}
}
}
// If there is only one address to read, return this directly.
// Otherwise, return a struct, with the keys being the string representations of the address.
if len(rawAddresses) == 1 {
stringAddress, err := NumericGroupAddressToString(rawAddresses[0], tag)
if err != nil {
m.log.Debug().Err(err).Msg("error mapping addresses")
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
return apiModel.PlcResponseCode_OK, values[stringAddress]
} else if len(rawAddresses) > 1 {
// Add it to the result
return apiModel.PlcResponseCode_OK, spiValues.NewPlcStruct(values)
} else {
// Add it to the result
return apiModel.PlcResponseCode_NOT_FOUND, nil
}
}
// If the given tag is a tag containing a pattern, resolve to all the possible addresses
// it could be referring to.
func (m *Reader) resolveAddresses(tag GroupAddressTag) ([]uint16, error) {
// Depending on the type of tag, get the uint16 ids of all values that match the current tag
var result []uint16
switch tag.(type) {
case GroupAddress3LevelPlcTag:
address3LevelPlcTag := tag.(GroupAddress3LevelPlcTag)
mainSegmentValues, err := m.resoleSegment(address3LevelPlcTag.MainGroup, 0, 31)
if err != nil {
return []uint16{}, err
}
middleSegmentValues, err := m.resoleSegment(address3LevelPlcTag.MiddleGroup, 0, 7)
if err != nil {
return []uint16{}, err
}
subSegmentValues, err := m.resoleSegment(address3LevelPlcTag.SubGroup, 0, 255)
if err != nil {
return []uint16{}, err
}
for _, main := range mainSegmentValues {
for _, middle := range middleSegmentValues {
for _, sub := range subSegmentValues {
result = append(result, main<<11|middle<<8|sub)
}
}
}
case GroupAddress2LevelPlcTag:
address2LevelPlcTag := tag.(GroupAddress2LevelPlcTag)
mainSegmentValues, err := m.resoleSegment(address2LevelPlcTag.MainGroup, 0, 31)
if err != nil {
return []uint16{}, err
}
subSegmentValues, err := m.resoleSegment(address2LevelPlcTag.SubGroup, 0, 2047)
if err != nil {
return []uint16{}, err
}
for _, main := range mainSegmentValues {
for _, sub := range subSegmentValues {
result = append(result, main<<11|sub)
}
}
case GroupAddress1LevelPlcTag:
address1LevelPlcTag := tag.(GroupAddress1LevelPlcTag)
mainSegmentValues, err := m.resoleSegment(address1LevelPlcTag.MainGroup, 0, 65535)
if err != nil {
return []uint16{}, err
}
for _, main := range mainSegmentValues {
result = append(result, main)
}
}
return result, nil
}
func (m *Reader) resoleSegment(pattern string, minValue uint16, maxValue uint16) ([]uint16, error) {
var results []uint16
// A "*" simply matches everything
if pattern == "*" {
for i := minValue; i <= maxValue; i++ {
results = append(results, i)
}
} else if strings.HasPrefix(pattern, "[") && strings.HasSuffix(pattern, "]") {
// If the pattern starts and ends with square brackets, it's a list of values or range queries
// Multiple options are separated by ","
for _, segment := range strings.Split(pattern[1:len(pattern)-1], ",") {
// If the segment contains a "-", then it's a range query,
// otherwise it's just a normal value.
if strings.Contains(segment, "-") {
split := strings.Split(segment, "-")
if len(split) == 2 {
minValue, err := strconv.ParseUint(split[0], 10, 16)
if err != nil {
return []uint16{}, errors.New("invalid address")
}
maxValue, err := strconv.ParseUint(split[1], 10, 16)
if err != nil {
return []uint16{}, errors.New("invalid address")
}
for i := uint16(minValue); i <= uint16(maxValue); i++ {
results = append(results, i)
}
} else {
return []uint16{}, errors.New("invalid address")
}
} else {
value, err := strconv.ParseUint(segment, 10, 16)
if err != nil {
return []uint16{}, errors.New("invalid address")
}
results = append(results, uint16(value))
}
}
} else {
value, err := strconv.ParseUint(pattern, 10, 16)
if err != nil {
return []uint16{}, errors.New("invalid address")
}
results = append(results, uint16(value))
}
return results, nil
}