blob: af96d746b7cbe311c298396f916fca5f56ec3784 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package knxnetip
import (
"bytes"
"context"
stdErrors "errors"
"fmt"
"net"
"net/url"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/udp"
"github.com/apache/plc4x/plc4go/spi/utils"
)
type Discoverer struct {
transportInstanceCreationWorkItemId atomic.Int32
transportInstanceCreationQueue pool.Executor
deviceScanningWorkItemId atomic.Int32
deviceScanningQueue pool.Executor
wg sync.WaitGroup // use to track spawned go routines
log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
}
func NewDiscoverer(_options ...options.WithOption) *Discoverer {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
return &Discoverer{
// TODO: maybe a dynamic executor would be better to not waste cycles when not in use
transportInstanceCreationQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
deviceScanningQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
log: customLogger,
_options: _options,
}
}
func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
d.transportInstanceCreationQueue.Start()
d.deviceScanningQueue.Start()
udpTransport := udp.NewTransport()
// Create a connection string for the KNX broadcast discovery address.
connectionUrl, err := url.Parse("udp://224.0.23.12:3671")
if err != nil {
return err
}
allInterfaces, err := net.Interfaces()
if err != nil {
return err
}
// If no device is explicitly selected via option, simply use all of them
// However if a discovery option is present to select a device by name, only
// add those devices matching any of the given names.
var interfaces []net.Interface
deviceNames := options.FilterDiscoveryOptionsDeviceName(discoveryOptions)
if len(deviceNames) > 0 {
for _, curInterface := range allInterfaces {
if err := ctx.Err(); err != nil {
return err
}
for _, deviceNameOption := range deviceNames {
if err := ctx.Err(); err != nil {
return err
}
if curInterface.Name == deviceNameOption.GetDeviceName() {
interfaces = append(interfaces, curInterface)
break
}
}
}
} else {
interfaces = allInterfaces
}
transportInstances := make(chan transports.TransportInstance)
wg := &sync.WaitGroup{}
// Iterate over all network devices of this system.
for _, netInterface := range interfaces {
if err := ctx.Err(); err != nil {
return err
}
addrs, err := netInterface.Addrs()
if err != nil {
return err
}
wg.Go(func() {
defer func() {
if err := recover(); err != nil {
d.log.Error().
Str("stack", string(debug.Stack())).
Interface("err", err).
Msg("panic-ed")
}
}()
// Iterate over all addresses the current interface has configured
// For KNX we're only interested in IPv4 addresses, as it doesn't
// seem to work with IPv6.
for _, addr := range addrs {
if err := ctx.Err(); err != nil {
d.log.Debug().Err(err).Msg("done")
return
}
var ipv4Addr net.IP
switch addr.(type) {
// If the device is configured to communicate with a subnet
case *net.IPNet:
ipv4Addr = addr.(*net.IPNet).IP.To4()
// If the device is configured for a point-to-point connection
case *net.IPAddr:
ipv4Addr = addr.(*net.IPAddr).IP.To4()
}
// If we found an IPv4 address and this is not a loopback address,
// add it to the list of devices we will open ports and send discovery
// messages from.
if ipv4Addr == nil || ipv4Addr.IsLoopback() {
continue
}
d.transportInstanceCreationQueue.Submit(ctx, d.transportInstanceCreationWorkItemId.Add(1), d.createTransportInstanceDispatcher(ctx, wg, connectionUrl, ipv4Addr, udpTransport, transportInstances))
}
})
}
d.wg.Go(func() {
wg.Wait()
d.log.Trace().Msg("Closing transport instance channel")
close(transportInstances)
})
d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
d.log.Error().
Str("stack", string(debug.Stack())).
Interface("err", err).
Msg("panic-ed")
}
}()
for transportInstance := range transportInstances {
if transportInstance == nil {
d.log.Trace().Msg("channel closed")
break
}
d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(ctx, transportInstance.(*udp.TransportInstance), callback))
}
})
return nil
}
func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport *udp.Transport, transportInstances chan transports.TransportInstance) pool.Runnable {
wg.Add(1)
return func(workerCtx context.Context) {
ctx, cancel := context.WithCancel(ctx)
context.AfterFunc(workerCtx, cancel)
defer wg.Done()
// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
transportInstance, err :=
udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil,
&net.UDPAddr{IP: ipv4Addr, Port: 0})
if err != nil {
d.log.Error().Err(err).Msg("error creating transport instance")
return
}
err = transportInstance.Connect(ctx)
if err != nil {
d.log.Debug().Err(err).Msg("Error Connecting")
return
}
d.log.Debug().Interface("transportInstance", transportInstance).Msg("Adding transport instance to scan %v")
transportInstances <- transportInstance
}
}
func (d *Discoverer) createDeviceScanDispatcher(ctx context.Context, udpTransportInstance *udp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) pool.Runnable {
return func(workerCtx context.Context) {
ctx, cancel := context.WithCancel(ctx)
context.AfterFunc(workerCtx, cancel)
d.log.Debug().Interface("udpTransportInstance", udpTransportInstance).Msg("Scanning")
// Create a codec for sending and receiving messages.
codec := NewMessageCodec(
udpTransportInstance,
nil,
append(d._options, options.WithCustomLogger(d.log))...,
)
// Explicitly start the worker
if err := codec.Connect(ctx); err != nil {
d.log.Error().Err(err).Msg("Error connecting")
return
}
localAddress := udpTransportInstance.LocalAddress
localAddr := driverModel.NewIPAddress(localAddress.IP)
// Prepare the discovery packet data
discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint(
driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
// Send the search request.
if err := codec.Send(ctx, "device_scan_search_request", searchRequestMessage); err != nil {
d.log.Debug().Err(err).Interface("searchRequestMessage", searchRequestMessage).Msg("Error sending message")
return
}
// Keep on reading responses till the timeout is done.
// TODO: Make this configurable
timeout := time.NewTimer(1 * time.Second)
timeout.Stop()
for start := time.Now(); time.Since(start) < time.Second*5; {
if err := ctx.Err(); err != nil {
d.log.Debug().Err(err).Msg("done")
return
}
timeout.Reset(1 * time.Second)
select {
case message := <-codec.GetDefaultIncomingMessageChannel():
{
if !timeout.Stop() {
<-timeout.C
}
searchResponse := message.(driverModel.SearchResponse)
if searchResponse != nil {
addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort()))
if err != nil {
continue
}
deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
discoveryEvent := spiModel.NewDefaultPlcDiscoveryItem(
"knxnet-ip",
"udp",
*remoteUrl,
nil,
deviceName,
nil,
)
// Pass the event back to the callback
callback(discoveryEvent)
}
continue
}
case <-timeout.C:
{
timeout.Stop()
continue
}
}
}
}
}
func (d *Discoverer) Close() error {
defer utils.StopWarn(d.log)()
d.log.Trace().Msg("Closing discoverer")
var collectedErrors []error
d.log.Trace().Msg("Closing transport instance creation queue")
if err := d.transportInstanceCreationQueue.Close(); err != nil {
collectedErrors = append(collectedErrors, errors.Wrap(err, "error closing transport instance creation queue"))
}
d.log.Trace().Msg("Closing device scanning queue")
if err := d.deviceScanningQueue.Close(); err != nil {
collectedErrors = append(collectedErrors, errors.Wrap(err, "error closing device scanning queue"))
}
d.log.Trace().Msg("waiting for wait group")
d.wg.Wait()
if err := stdErrors.Join(collectedErrors...); err != nil {
return errors.Wrap(err, "error closing discoverer")
}
return nil
}