| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package knxnetip |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/hex" |
| "fmt" |
| "runtime/debug" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/pkg/errors" |
| "github.com/rs/zerolog" |
| |
| "github.com/apache/plc4x/plc4go/pkg/api" |
| apiModel "github.com/apache/plc4x/plc4go/pkg/api/model" |
| "github.com/apache/plc4x/plc4go/pkg/api/values" |
| driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model" |
| "github.com/apache/plc4x/plc4go/spi" |
| _default "github.com/apache/plc4x/plc4go/spi/default" |
| "github.com/apache/plc4x/plc4go/spi/interceptors" |
| spiModel "github.com/apache/plc4x/plc4go/spi/model" |
| "github.com/apache/plc4x/plc4go/spi/options" |
| "github.com/apache/plc4x/plc4go/spi/tracer" |
| "github.com/apache/plc4x/plc4go/spi/transports" |
| ) |
| |
| //go:generate go tool plc4xGenerator -type=ConnectionMetadata |
| type ConnectionMetadata struct { |
| KnxMedium driverModel.KnxMedium `stringer:"true"` |
| GatewayName string |
| GatewayKnxAddress string |
| ClientKnxAddress string |
| |
| ProjectNumber uint8 |
| InstallationNumber uint8 |
| DeviceSerialNumber []byte |
| DeviceMulticastAddress []byte |
| DeviceMacAddress []byte |
| SupportedServices []string |
| } |
| |
| func (m *ConnectionMetadata) GetConnectionAttributes() map[string]string { |
| return map[string]string{ |
| "KnxMedium": m.KnxMedium.String(), |
| "GatewayName": m.GatewayName, |
| "GatewayKnxAddress": m.GatewayKnxAddress, |
| "ClientKnxAddress": m.ClientKnxAddress, |
| |
| "ProjectNumber": strconv.Itoa(int(m.ProjectNumber)), |
| "InstallationNumber": strconv.Itoa(int(m.InstallationNumber)), |
| "DeviceSerialNumber": ByteArrayToString(m.DeviceSerialNumber, " "), |
| "DeviceMulticastAddress": ByteArrayToString(m.DeviceSerialNumber, "."), |
| "DeviceMacAddress": ByteArrayToString(m.DeviceSerialNumber, ":"), |
| "SupportedServices": strings.Join(m.SupportedServices, ", "), |
| } |
| } |
| |
| func (m *ConnectionMetadata) CanRead() bool { |
| return true |
| } |
| |
| func (m *ConnectionMetadata) CanWrite() bool { |
| return true |
| } |
| |
| func (m *ConnectionMetadata) CanSubscribe() bool { |
| return true |
| } |
| |
| func (m *ConnectionMetadata) CanBrowse() bool { |
| return true |
| } |
| |
| type KnxDeviceConnection struct { |
| counter uint8 |
| deviceDescriptor uint16 |
| maxApdu uint16 |
| } |
| |
| type KnxMemoryReadFragment struct { |
| numElements uint8 |
| startingAddress uint16 |
| } |
| |
| type Connection struct { |
| messageCodec spi.MessageCodec |
| options map[string][]string |
| tagHandler spi.PlcTagHandler |
| valueHandler spi.PlcValueHandler |
| connectionStateTimer *time.Ticker |
| quitConnectionStateTimer chan struct{} |
| subscribers []*Subscriber |
| |
| valueCache map[uint16][]byte |
| valueCacheMutex sync.RWMutex |
| metadata *ConnectionMetadata |
| defaultTtl time.Duration |
| connectionTtl time.Duration |
| buildingKey []byte |
| |
| // Used for detecting connection problems |
| connectionTimeoutTimer *time.Timer |
| |
| GatewayKnxAddress driverModel.KnxAddress |
| ClientKnxAddress driverModel.KnxAddress |
| CommunicationChannelId uint8 |
| SequenceCounter int32 |
| TunnelingRequestExpectationId int32 |
| DeviceConnections map[driverModel.KnxAddress]*KnxDeviceConnection |
| |
| requestInterceptor interceptors.RequestInterceptor |
| sync.Mutex |
| |
| // indicates if the tunneling requests loop is running |
| handleTunnelingRequests bool |
| |
| connectionId string |
| tracer tracer.Tracer |
| |
| wg sync.WaitGroup // use to track spawned go routines |
| |
| passLogToModel bool |
| log zerolog.Logger |
| _options []options.WithOption // Used to pass them downstream |
| } |
| |
| func (m *Connection) String() string { |
| return fmt.Sprintf("knx.Connection{}") |
| } |
| |
| type KnxReadResult struct { |
| value values.PlcValue |
| numItems uint8 |
| err error |
| } |
| |
| type KnxDeviceConnectResult struct { |
| connection *KnxDeviceConnection |
| err error |
| } |
| |
| type KnxDeviceDisconnectResult struct { |
| connection *KnxDeviceConnection |
| err error |
| } |
| |
| type KnxDeviceAuthenticateResult struct { |
| err error |
| } |
| |
| type InternalResult struct { |
| responseMessage spi.Message |
| err error |
| } |
| |
| func NewConnection(transportInstance transports.TransportInstance, connectionOptions map[string][]string, tagHandler spi.PlcTagHandler, _options ...options.WithOption) *Connection { |
| passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...) |
| customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) |
| connection := &Connection{ |
| options: connectionOptions, |
| tagHandler: tagHandler, |
| valueHandler: NewValueHandler(), |
| requestInterceptor: interceptors.NewSingleItemRequestInterceptor( |
| spiModel.NewDefaultPlcReadRequest, |
| spiModel.NewDefaultPlcWriteRequest, |
| spiModel.NewDefaultPlcReadResponse, |
| spiModel.NewDefaultPlcWriteResponse, |
| _options..., |
| ), |
| subscribers: []*Subscriber{}, |
| valueCache: map[uint16][]byte{}, |
| valueCacheMutex: sync.RWMutex{}, |
| metadata: &ConnectionMetadata{}, |
| defaultTtl: 10 * time.Second, |
| DeviceConnections: map[driverModel.KnxAddress]*KnxDeviceConnection{}, |
| handleTunnelingRequests: true, |
| passLogToModel: passLoggerToModel, |
| log: customLogger, |
| _options: _options, |
| } |
| connection.connectionTtl = connection.defaultTtl * 2 |
| |
| if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok { |
| if len(traceEnabledOption) == 1 { |
| connection.tracer = tracer.NewTracer(connection.connectionId, _options...) |
| } |
| } |
| // If a building key was provided, save that in a dedicated variable |
| if buildingKey, ok := connectionOptions["buildingKey"]; ok { |
| bc, err := hex.DecodeString(buildingKey[0]) |
| if err == nil { |
| connection.buildingKey = bc |
| } |
| } |
| connection.messageCodec = NewMessageCodec(transportInstance, connection.interceptIncomingMessage) |
| return connection |
| } |
| |
| func (m *Connection) GetConnectionId() string { |
| return m.connectionId |
| } |
| |
| func (m *Connection) IsTraceEnabled() bool { |
| return m.tracer != nil |
| } |
| |
| func (m *Connection) GetTracer() tracer.Tracer { |
| return m.tracer |
| } |
| |
| func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult { |
| return m.ConnectWithContext(context.Background()) |
| } |
| |
| func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult { |
| result := make(chan plc4go.PlcConnectionConnectResult, 1) |
| sendResult := func(connection plc4go.PlcConnection, err error) { |
| result <- _default.NewDefaultPlcConnectionConnectResult(connection, err) |
| } |
| |
| m.wg.Go(func() { |
| defer func() { |
| if err := recover(); err != nil { |
| result <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack())) |
| } |
| }() |
| // Open the UDP Connection |
| err := m.messageCodec.ConnectWithContext(ctx) |
| if err != nil { |
| m.doSomethingAndClose(func() { sendResult(nil, errors.Wrap(err, "error opening connection")) }) |
| return |
| } |
| |
| // Send a search request before connecting to the device. |
| searchResponse, err := m.sendGatewaySearchRequest(ctx) |
| if err != nil { |
| m.doSomethingAndClose(func() { sendResult(nil, errors.Wrap(err, "error discovering device capabilities")) }) |
| return |
| } |
| |
| // Save some important information |
| dibDeviceInfo := searchResponse.GetDibDeviceInfo() |
| m.metadata.KnxMedium = dibDeviceInfo.GetKnxMedium() |
| m.metadata.GatewayName = string(bytes.Trim(dibDeviceInfo.GetDeviceFriendlyName(), "\x00")) |
| m.GatewayKnxAddress = dibDeviceInfo.GetKnxAddress() |
| m.metadata.GatewayKnxAddress = KnxAddressToString(m.GatewayKnxAddress) |
| m.metadata.ProjectNumber = dibDeviceInfo.GetProjectInstallationIdentifier().GetProjectNumber() |
| m.metadata.InstallationNumber = dibDeviceInfo.GetProjectInstallationIdentifier().GetInstallationNumber() |
| m.metadata.DeviceSerialNumber = dibDeviceInfo.GetKnxNetIpDeviceSerialNumber() |
| m.metadata.DeviceMulticastAddress = dibDeviceInfo.GetKnxNetIpDeviceMulticastAddress().GetAddr() |
| m.metadata.DeviceMacAddress = dibDeviceInfo.GetKnxNetIpDeviceMacAddress().GetAddr() |
| m.metadata.SupportedServices = []string{} |
| supportsTunneling := false |
| for _, serviceId := range searchResponse.GetDibSuppSvcFamilies().GetServiceIds() { |
| m.metadata.SupportedServices = append(m.metadata.SupportedServices, serviceId.(interface{ GetTypeName() string }).GetTypeName()) |
| // If this is an instance of the "tunneling", service, this connection supports tunneling |
| _, ok := serviceId.(driverModel.KnxNetIpTunneling) |
| if ok { |
| supportsTunneling = true |
| break |
| } |
| } |
| |
| // If the current device supports tunneling, create a tunneling connection. |
| // Via this connection we then get access to the entire KNX network this Gateway is connected to. |
| if supportsTunneling { |
| // As soon as we got a successful search-response back, send a connection request. |
| connectionResponse, err := m.sendGatewayConnectionRequest(ctx) |
| if err != nil { |
| m.doSomethingAndClose(func() { sendResult(nil, errors.Wrap(err, "error connecting to device")) }) |
| return |
| } |
| |
| // Save the communication channel id |
| m.CommunicationChannelId = connectionResponse.GetCommunicationChannelId() |
| |
| // Reset the sequence counter |
| m.SequenceCounter = -1 |
| |
| // If the connection was successful, the gateway will now forward any packets |
| // on the KNX bus that are broadcast packets to us, so we have to setup things |
| // to handle these incoming messages. |
| switch connectionResponse.GetStatus() { |
| case driverModel.Status_NO_ERROR: |
| // Save the KNX Address the Gateway assigned to us for this connection. |
| tunnelConnectionDataBlock := connectionResponse.GetConnectionResponseDataBlock().(driverModel.ConnectionResponseDataBlockTunnelConnection) |
| m.ClientKnxAddress = tunnelConnectionDataBlock.GetKnxAddress() |
| |
| // Create a go routine to handle incoming tunneling-requests which haven't been |
| // handled by any other handler. This is where usually the GroupValueWrite messages |
| // are being handled. |
| m.log.Debug().Msg("Starting tunneling handler") |
| m.wg.Go(func() { |
| defer func() { |
| if err := recover(); err != nil { |
| m.log.Error(). |
| Str("stack", string(debug.Stack())). |
| Interface("err", err). |
| Msg("panic-ed") |
| } |
| }() |
| defaultIncomingMessageChannel := m.messageCodec.GetDefaultIncomingMessageChannel() |
| for m.handleTunnelingRequests { |
| incomingMessage := <-defaultIncomingMessageChannel |
| tunnelingRequest, ok := incomingMessage.(driverModel.TunnelingRequest) |
| if !ok { |
| tunnelingResponse, ok := incomingMessage.(driverModel.TunnelingResponse) |
| if ok { |
| m.log.Warn().Stringer("tunnelingResponse", tunnelingResponse).Msg("Got an unhandled TunnelingResponse message") |
| } else { |
| m.log.Warn().Stringer("incomingMessage", incomingMessage).Msg("Not a TunnelingRequest or TunnelingResponse message") |
| } |
| continue |
| } |
| |
| if tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { |
| m.log.Warn().Stringer("tunnelingRequest", tunnelingRequest).Msg("Not for this connection") |
| continue |
| } |
| |
| lDataInd, ok := tunnelingRequest.GetCemi().(driverModel.LDataInd) |
| if !ok { |
| continue |
| } |
| // Get APDU, source and target address |
| lDataFrameData := lDataInd.GetDataFrame().(driverModel.LDataExtended) |
| sourceAddress := lDataFrameData.GetSourceAddress() |
| |
| // If this is not an APDU, there is no need to further handle it. |
| if lDataFrameData.GetApdu() == nil { |
| continue |
| } |
| |
| // If this is an incoming disconnect request, remove the device |
| // from the device connections, otherwise handle it as normal |
| // incoming message. |
| apduControlContainer, ok := lDataFrameData.GetApdu().(driverModel.ApduControlContainer) |
| if ok { |
| _, ok := apduControlContainer.GetControlApdu().(driverModel.ApduControlDisconnect) |
| if ok { |
| if m.DeviceConnections[sourceAddress] != nil /* && m.ClientKnxAddress == Int8ArrayToKnxAddress(targetAddress)*/ { |
| // Remove the connection |
| delete(m.DeviceConnections, sourceAddress) |
| } |
| } |
| } else { |
| m.handleIncomingTunnelingRequest(ctx, tunnelingRequest) |
| } |
| } |
| m.log.Warn().Msg("Tunneling handler shat down") |
| }) |
| |
| // Fire the "connected" event |
| sendResult(m, nil) |
| case driverModel.Status_NO_MORE_CONNECTIONS: |
| m.doSomethingAndClose(func() { sendResult(nil, errors.New("no more connections")) }) |
| default: |
| m.doSomethingAndClose(func() { sendResult(nil, errors.Errorf("got a return status of: %s", connectionResponse.GetStatus())) }) |
| } |
| } else { |
| m.doSomethingAndClose(func() { sendResult(nil, errors.New("this device doesn't support tunneling")) }) |
| } |
| }) |
| |
| return result |
| } |
| |
| func (m *Connection) doSomethingAndClose(something func()) { |
| something() |
| err := m.messageCodec.Disconnect() |
| if err != nil { |
| m.log.Warn().Err(err).Msg("error closing connection") |
| } |
| } |
| |
| func (m *Connection) BlockingClose() { |
| ttlTimer := time.NewTimer(m.defaultTtl) |
| closeResults := m.Close() |
| select { |
| case <-closeResults: |
| if !ttlTimer.Stop() { |
| <-ttlTimer.C |
| } |
| return |
| case <-ttlTimer.C: |
| ttlTimer.Stop() |
| return |
| } |
| } |
| |
| func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult { |
| // TODO: use proper context |
| ctx := context.TODO() |
| result := make(chan plc4go.PlcConnectionCloseResult, 1) |
| |
| m.wg.Go(func() { |
| defer func() { |
| if err := recover(); err != nil { |
| result <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack())) |
| } |
| }() |
| // Stop the connection-state checker. |
| if m.connectionStateTimer != nil { |
| m.connectionStateTimer.Stop() |
| } |
| |
| // Disconnect from all knx devices we are still connected to. |
| for targetAddress := range m.DeviceConnections { |
| ttlTimer := time.NewTimer(m.defaultTtl) |
| disconnects := m.DeviceDisconnect(ctx, targetAddress) |
| select { |
| case _ = <-disconnects: |
| if !ttlTimer.Stop() { |
| <-ttlTimer.C |
| } |
| case <-ttlTimer.C: |
| ttlTimer.Stop() |
| // If we got a timeout here, well just continue the device will just auto disconnect. |
| m.log.Debug().Str("targetAddress", KnxAddressToString(targetAddress)).Msg("Timeout disconnecting from device") |
| } |
| } |
| |
| // Send a disconnect request from the gateway. |
| _, err := m.sendGatewayDisconnectionRequest(ctx) |
| if err != nil { |
| result <- _default.NewDefaultPlcConnectionCloseResult(m, errors.Wrap(err, "got an error while disconnecting")) |
| } else { |
| result <- _default.NewDefaultPlcConnectionCloseResult(m, nil) |
| } |
| }) |
| |
| return result |
| } |
| |
| func (m *Connection) IsConnected() bool { |
| if m.messageCodec != nil { |
| ttlTimer := time.NewTimer(m.defaultTtl) |
| pingChannel := m.Ping() |
| select { |
| case pingResponse := <-pingChannel: |
| if !ttlTimer.Stop() { |
| <-ttlTimer.C |
| } |
| return pingResponse.GetErr() == nil |
| case <-ttlTimer.C: |
| ttlTimer.Stop() |
| m.handleTimeout() |
| return false |
| } |
| } |
| return false |
| } |
| |
| func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult { |
| // TODO: use proper context |
| ctx := context.TODO() |
| result := make(chan plc4go.PlcConnectionPingResult, 1) |
| |
| m.wg.Go(func() { |
| defer func() { |
| if err := recover(); err != nil { |
| result <- _default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack())) |
| } |
| }() |
| // Send the connection state request |
| _, err := m.sendConnectionStateRequest(ctx) |
| if err != nil { |
| result <- _default.NewDefaultPlcConnectionPingResult(errors.Wrap(err, "got an error")) |
| } else { |
| result <- _default.NewDefaultPlcConnectionPingResult(nil) |
| } |
| return |
| }) |
| |
| return result |
| } |
| |
| func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata { |
| return m.metadata |
| } |
| |
| func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder { |
| return spiModel.NewDefaultPlcReadRequestBuilder( |
| m.tagHandler, NewReader(m)) |
| } |
| |
| func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder { |
| return spiModel.NewDefaultPlcWriteRequestBuilder( |
| m.tagHandler, m.valueHandler, NewWriter(m.messageCodec)) |
| } |
| |
| func (m *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder { |
| return spiModel.NewDefaultPlcSubscriptionRequestBuilder( |
| m.tagHandler, |
| m.valueHandler, |
| NewSubscriber( |
| m, |
| append(m._options, options.WithCustomLogger(m.log))..., |
| ), |
| ) |
| } |
| |
| func (m *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder { |
| return spiModel.NewDefaultPlcBrowseRequestBuilder(m.tagHandler, NewBrowser(m, m.messageCodec)) |
| } |
| |
| func (m *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder { |
| return nil /*spiModel.NewDefaultPlcUnsubscriptionRequestBuilder( |
| m.tagHandler, m.valueHandler, NewSubscriber(m.messageCodec))*/ |
| } |
| |
| func (m *Connection) GetTransportInstance() transports.TransportInstance { |
| if mc, ok := m.messageCodec.(spi.TransportInstanceExposer); ok { |
| return mc.GetTransportInstance() |
| } |
| return nil |
| } |
| |
| func (m *Connection) GetPlcTagHandler() spi.PlcTagHandler { |
| return m.tagHandler |
| } |
| |
| func (m *Connection) GetPlcValueHandler() spi.PlcValueHandler { |
| return m.valueHandler |
| } |
| |
| func ByteArrayToString(data []byte, separator string) string { |
| var sb strings.Builder |
| if data != nil { |
| for i, element := range data { |
| sb.WriteString(strconv.Itoa(int(element))) |
| if i < (len(data) - 1) { |
| sb.WriteString(separator) |
| } |
| } |
| } |
| return sb.String() |
| } |