blob: 85a3f73b38365208f8c33538df273e269832301c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package cbus
import (
"context"
"fmt"
"runtime/debug"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
_default "github.com/apache/plc4x/plc4go/spi/default"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
)
type Browser struct {
_default.DefaultBrowser
connection plc4go.PlcConnection
sequenceCounter uint8
log zerolog.Logger
}
func NewBrowser(connection plc4go.PlcConnection, _options ...options.WithOption) *Browser {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
browser := &Browser{
connection: connection,
sequenceCounter: 0,
log: customLogger,
}
browser.DefaultBrowser = _default.NewDefaultBrowser(browser, _options...)
return browser
}
func (m *Browser) BrowseQuery(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query apiModel.PlcQuery) (responseCode apiModel.PlcResponseCode, queryResults []apiModel.PlcBrowseItem) {
switch query := query.(type) {
case *unitInfoQuery:
return m.browseUnitInfo(ctx, interceptor, queryName, query)
default:
m.log.Warn().Type("query", query).Msg("unsupported query type supplied %T")
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
}
func (m *Browser) browseUnitInfo(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query *unitInfoQuery) (responseCode apiModel.PlcResponseCode, queryResults []apiModel.PlcBrowseItem) {
m.log.Trace().Msg("extract units")
units, allUnits, err := m.extractUnits(ctx, query, m.getInstalledUnitAddressBytes)
if err != nil {
m.log.Error().Err(err).Msg("Error extracting units")
return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
}
attributes, allAttributes := m.extractAttributes(query)
if allUnits {
m.log.Info().Msg("Querying all (available) units")
} else {
m.log.Debug().Interface("units", units).Msg("Querying units")
}
unitLoop:
for _, unit := range units {
unitLog := m.log.With().Stringer("unit", unit).Logger()
unitLog.Trace().Msg("checking unit")
if err := ctx.Err(); err != nil {
unitLog.Info().Err(err).Msg("Aborting scan at unit")
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
unitAddress := unit.GetAddress()
if !allUnits && allAttributes {
m.log.Info().
Uint8("unitAddress", unitAddress).
Msg("Querying all attributes of unit")
}
level := zerolog.InfoLevel
if allUnits {
level = zerolog.DebugLevel
}
m.log.WithLevel(level).Uint8("unitAddress", unitAddress).Msg("Query unit")
for _, attribute := range attributes {
if err := ctx.Err(); err != nil {
unitLog.Info().Err(err).Msg("Aborting scan at unit")
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
if !allUnits && !allAttributes {
m.log.Info().
Uint8("unitAddress", unitAddress).
Stringer("attribute", attribute).
Msg("Querying attribute of unit")
} else {
m.log.WithLevel(level).Uint8("unitAddress", unitAddress).
Stringer("attribute", attribute).
Msg("unit unitAddress: Query attribute")
}
m.log.Trace().Msg("Building request")
readTagName := fmt.Sprintf("%s/%d/%s", queryName, unitAddress, attribute)
readRequest, _ := m.connection.ReadRequestBuilder().
AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
Build()
timeout := 5 * time.Second
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, timeout)
m.log.Trace().
Stringer("readRequest", readRequest).
Dur("timeout", timeout).
Msg("Executing readRequest with timeout")
requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
m.log.Trace().Stringer("requestResult", requestResult).Msg("got a response")
timeoutCancel()
if err := requestResult.GetErr(); err != nil {
if allUnits || allAttributes {
level = zerolog.TraceLevel
}
m.log.WithLevel(level).Err(err).
Uint8("unitAddress", unitAddress).
Stringer("attribute", attribute).
Msg("unit unitAddress: Can't read attribute attribute")
continue unitLoop
}
response := requestResult.GetResponse()
if code := response.GetResponseCode(readTagName); code != apiModel.PlcResponseCode_OK {
m.log.WithLevel(level).
Uint8("unitAddress", unitAddress).
Stringer("attribute", attribute).
Stringer("code", code).
Msg("unit unitAddress: error reading tag attribute. Code %s")
continue unitLoop
}
queryResult := spiModel.NewDefaultPlcBrowseItem(
NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1),
queryName,
"",
true,
false,
false,
nil,
map[string]values.PlcValue{
"CurrentValue": response.GetValue(readTagName),
},
)
if interceptor != nil {
m.log.Trace().Msg("forwarding query result to interceptor")
interceptor(queryResult)
}
queryResults = append(queryResults, queryResult)
}
}
return apiModel.PlcResponseCode_OK, queryResults
}
func (m *Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
if unitAddress := query.unitAddress; unitAddress != nil {
return []readWriteModel.UnitAddress{unitAddress}, false, nil
}
// TODO: check if we still want the option to brute force all addresses
installedUnitAddressBytes, err := getInstalledUnitAddressBytes(ctx)
if err != nil {
return nil, false, errors.New("Unable to get installed uints")
}
var units []readWriteModel.UnitAddress
for i := 0; i <= 0xFF; i++ {
unitAddressByte := byte(i)
if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
}
}
return units, true, nil
}
func (m *Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
if attribute := query.attribute; attribute != nil {
return []readWriteModel.Attribute{*attribute}, false
} else {
var attributes []readWriteModel.Attribute
for _, attribute := range readWriteModel.AttributeValues {
attributes = append(attributes, attribute)
}
return attributes, true
}
}
func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any, error) {
start := time.Now()
defer func() {
m.log.Debug().TimeDiff("duration", time.Now(), start).Msg("Ending unit address acquiring after duration")
}()
// We need to pre-subscribe to catch the 2 followup responses
subscriptionRequest, err := m.connection.SubscriptionRequestBuilder().
AddEventTagAddress("installationMMIMonitor", "mmimonitor/*/NETWORK_CONTROL").
Build()
if err != nil {
return nil, errors.Wrap(err, "Error subscribing to the installation MMI")
}
subCtx, subCtxCancel := context.WithTimeout(ctx, 2*time.Second)
defer subCtxCancel()
subscriptionResult := <-subscriptionRequest.ExecuteWithContext(subCtx)
if err := subscriptionResult.GetErr(); err != nil {
return nil, errors.Wrap(err, "Error subscribing to the mmi")
}
response := subscriptionResult.GetResponse()
if responseCode := response.GetResponseCode("installationMMIMonitor"); responseCode != apiModel.PlcResponseCode_OK {
return nil, errors.Errorf("Got %s", responseCode)
}
subscriptionHandle, err := response.GetSubscriptionHandle("installationMMIMonitor")
if err != nil {
return nil, errors.Wrap(err, "Error getting the subscription handle")
}
build, err := m.connection.UnsubscriptionRequestBuilder().AddHandles(subscriptionHandle).Build()
if err != nil {
return nil, errors.Wrap(err, "Error building unsubscription request")
}
defer build.ExecuteWithContext(ctx)
blockOffset0Received := false
blockOffset0ReceivedChan := make(chan any, 100) // We only expect one, but we make it a bit bigger to no clog up
blockOffset88Received := false
blockOffset88ReceivedChan := make(chan any, 100) // We only expect one, but we make it a bit bigger to no clog up
blockOffset176Received := false
blockOffset176ReceivedChan := make(chan any, 100) // We only expect one, but we make it a bit bigger to no clog up
result := make(map[byte]any)
plcConsumerRegistration := subscriptionHandle.Register(func(event apiModel.PlcSubscriptionEvent) {
m.log.Trace().Stringer("event", event).Msg("handling event")
if responseCode := event.GetResponseCode("installationMMIMonitor"); responseCode != apiModel.PlcResponseCode_OK {
m.log.Warn().Stringer("event", event).Msg("Ignoring")
return
}
rootValue := event.GetValue("installationMMIMonitor")
if !rootValue.IsStruct() {
m.log.Warn().Stringer("rootValue", rootValue).Msg("Ignoring rootValue should be a struct")
return
}
rootStruct := rootValue.GetStruct()
if applicationValue := rootStruct["application"]; applicationValue == nil || !applicationValue.IsString() || applicationValue.GetString() != "NETWORK_CONTROL" {
m.log.Warn().
Interface("rootStruct", rootStruct).
Msg("Ignoring rootStruct should contain a application tag of type string with value NETWORK_CONTROL")
return
}
var blockStart int
if blockStartValue := rootStruct["blockStart"]; blockStartValue == nil || !blockStartValue.IsByte() {
m.log.Warn().
Interface("rootStruct", rootStruct).
Msg("Ignoring rootStruct should contain a blockStart tag of type byte")
return
} else {
blockStart = int(blockStartValue.GetByte())
}
if plcListValue := rootStruct["values"]; plcListValue == nil || !plcListValue.IsList() {
m.log.Warn().
Interface("rootStruct", rootStruct).
Msg("Ignoring rootStruct should contain a values tag of type list")
return
} else {
for unitByteAddress, plcValue := range plcListValue.GetList() {
unitByteAddress = blockStart + unitByteAddress
if !plcValue.IsString() {
m.log.Warn().
Stringer("plcValue", plcValue).
Int("unitByteAddress", unitByteAddress).
Msg("Ignoring plcValue at unitByteAddress should be a string")
return
}
switch plcValue.GetString() {
case readWriteModel.GAVState_ON.PLC4XEnumName(), readWriteModel.GAVState_OFF.PLC4XEnumName():
m.log.Debug().
Int("unitByteAddress", unitByteAddress).
Msg("unit does exists")
result[byte(unitByteAddress)] = true
case readWriteModel.GAVState_DOES_NOT_EXIST.PLC4XEnumName():
m.log.Debug().
Int("unitByteAddress", unitByteAddress).
Msg("unit does not exists")
case readWriteModel.GAVState_ERROR.PLC4XEnumName():
m.log.Warn().
Int("unitByteAddress", unitByteAddress).
Msg("unit is in error state")
}
}
}
// We notify here so we don't exit to early
switch blockStart {
case 0:
select {
case blockOffset0ReceivedChan <- true:
m.log.Trace().Msg("0 notified")
default:
m.log.Warn().Msg("0 blocked")
}
case 88:
select {
case blockOffset88ReceivedChan <- true:
m.log.Trace().Msg("88 notified")
default:
m.log.Warn().Msg("88 blocked")
}
case 176:
select {
case blockOffset176ReceivedChan <- true:
m.log.Trace().Msg("176 notified")
default:
m.log.Warn().Msg("176 blocked")
}
}
})
defer plcConsumerRegistration.Unregister()
readRequest, err := m.connection.ReadRequestBuilder().
AddTagAddress("installationMMI", "status/binary/0xFF").
Build()
if err != nil {
return nil, errors.Wrap(err, "Error building the installation MMI")
}
readCtx, readCtxCancel := context.WithTimeout(ctx, 2*time.Second)
defer readCtxCancel()
readWg := new(sync.WaitGroup)
readWg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
Str("stack", string(debug.Stack())).
Interface("err", err).
Msg("panic-ed")
}
}()
defer readCtxCancel()
m.log.Debug().Stringer("readRequest", readRequest).Msg("sending read request")
readRequestResult := <-readRequest.ExecuteWithContext(readCtx)
if err := readRequestResult.GetErr(); err != nil {
m.log.Warn().Err(err).Msg("Error reading the mmi")
return
}
response := readRequestResult.GetResponse()
if responseCode := response.GetResponseCode("installationMMI"); responseCode == apiModel.PlcResponseCode_OK {
rootValue := response.GetValue("installationMMI")
if !rootValue.IsStruct() {
m.log.Warn().Err(err).Stringer("rootValue", rootValue).Msg("%v should be a struct")
return
}
rootStruct := rootValue.GetStruct()
if applicationValue := rootStruct["application"]; applicationValue == nil || !applicationValue.IsString() || applicationValue.GetString() != "NETWORK_CONTROL" {
m.log.Warn().Err(err).
Interface("rootStruct", rootStruct).
Msg("%v should contain a application tag of type string with value NETWORK_CONTROL")
return
}
var blockStart int
if blockStartValue := rootStruct["blockStart"]; blockStartValue == nil || !blockStartValue.IsByte() || blockStartValue.GetByte() != 0 {
m.log.Warn().Err(err).
Interface("rootStruct", rootStruct).
Msg("rootStruct should contain a blockStart tag of type byte with value 0")
return
} else {
blockStart = int(blockStartValue.GetByte())
}
m.log.Debug().Int("blockStart", blockStart).Msg("Read MMI with block start")
if plcListValue := rootStruct["values"]; plcListValue == nil || !plcListValue.IsList() {
m.log.Warn().Err(err).
Interface("rootStruct", rootStruct).
Msg("rootStruct should contain a values tag of type list")
return
} else {
for unitByteAddress, plcValue := range plcListValue.GetList() {
unitByteAddress = blockStart + unitByteAddress
if !plcValue.IsString() {
m.log.Warn().Err(err).
Stringer("plcValue", plcValue).
Int("unitByteAddress", unitByteAddress).
Msg("plcValue at unitByteAddress should be a string")
return
}
switch plcValue.GetString() {
case readWriteModel.GAVState_ON.PLC4XEnumName(), readWriteModel.GAVState_OFF.PLC4XEnumName():
m.log.Debug().
Int("unitByteAddress", unitByteAddress).
Msg("unit does exists")
result[byte(unitByteAddress)] = true
case readWriteModel.GAVState_DOES_NOT_EXIST.PLC4XEnumName():
m.log.Debug().
Int("unitByteAddress", unitByteAddress).
Msg("unit does not exists")
case readWriteModel.GAVState_ERROR.PLC4XEnumName():
m.log.Warn().
Int("unitByteAddress", unitByteAddress).
Msg("unit is in error state")
}
}
}
switch blockStart {
case 0:
blockOffset0Received = true
m.log.Trace().Msg("block 0 read by read")
case 88:
blockOffset88Received = true
m.log.Trace().Msg("block 88 read by read")
case 176:
blockOffset176Received = true
m.log.Trace().Msg("block 176 read by read")
}
} else {
m.log.Warn().
Stringer("responseCode", responseCode).
Msg("We got responseCode as response code for installation mmi so we rely on getting it via subscription")
}
})
syncCtx, syncCtxCancel := context.WithTimeout(ctx, 6*time.Second)
defer syncCtxCancel()
for !blockOffset0Received || !blockOffset88Received || !blockOffset176Received {
select {
case <-blockOffset0ReceivedChan:
m.log.Trace().Msg("Offset 0 received")
blockOffset0Received = true
case <-blockOffset88ReceivedChan:
m.log.Trace().Msg("Offset 88 received")
blockOffset88Received = true
case <-blockOffset176ReceivedChan:
m.log.Trace().Msg("Offset 176 received")
blockOffset176Received = true
case <-syncCtx.Done():
err = syncCtx.Err()
m.log.Trace().Err(err).Msg("Ending prematurely")
return nil, errors.Wrap(err, "error waiting for other offsets")
}
}
readWg.Wait()
return result, nil
}