| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package knxnetip |
| |
| import ( |
| "bytes" |
| "context" |
| stdErrors "errors" |
| "fmt" |
| "net" |
| "net/url" |
| "runtime/debug" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "github.com/pkg/errors" |
| "github.com/rs/zerolog" |
| |
| apiModel "github.com/apache/plc4x/plc4go/pkg/api/model" |
| driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model" |
| spiModel "github.com/apache/plc4x/plc4go/spi/model" |
| "github.com/apache/plc4x/plc4go/spi/options" |
| "github.com/apache/plc4x/plc4go/spi/pool" |
| "github.com/apache/plc4x/plc4go/spi/transports" |
| "github.com/apache/plc4x/plc4go/spi/transports/udp" |
| "github.com/apache/plc4x/plc4go/spi/utils" |
| ) |
| |
| type Discoverer struct { |
| transportInstanceCreationWorkItemId atomic.Int32 |
| transportInstanceCreationQueue pool.Executor |
| deviceScanningWorkItemId atomic.Int32 |
| deviceScanningQueue pool.Executor |
| |
| wg sync.WaitGroup // use to track spawned go routines |
| |
| log zerolog.Logger |
| _options []options.WithOption // Used to pass them downstream |
| } |
| |
| func NewDiscoverer(_options ...options.WithOption) *Discoverer { |
| customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) |
| return &Discoverer{ |
| // TODO: maybe a dynamic executor would be better to not waste cycles when not in use |
| transportInstanceCreationQueue: pool.NewFixedSizeExecutor(50, 100, _options...), |
| deviceScanningQueue: pool.NewFixedSizeExecutor(50, 100, _options...), |
| log: customLogger, |
| _options: _options, |
| } |
| } |
| |
| func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error { |
| d.transportInstanceCreationQueue.Start() |
| d.deviceScanningQueue.Start() |
| |
| udpTransport := udp.NewTransport() |
| |
| // Create a connection string for the KNX broadcast discovery address. |
| connectionUrl, err := url.Parse("udp://224.0.23.12:3671") |
| if err != nil { |
| return err |
| } |
| |
| allInterfaces, err := net.Interfaces() |
| if err != nil { |
| return err |
| } |
| |
| // If no device is explicitly selected via option, simply use all of them |
| // However if a discovery option is present to select a device by name, only |
| // add those devices matching any of the given names. |
| var interfaces []net.Interface |
| deviceNames := options.FilterDiscoveryOptionsDeviceName(discoveryOptions) |
| if len(deviceNames) > 0 { |
| for _, curInterface := range allInterfaces { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| for _, deviceNameOption := range deviceNames { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| if curInterface.Name == deviceNameOption.GetDeviceName() { |
| interfaces = append(interfaces, curInterface) |
| break |
| } |
| } |
| } |
| } else { |
| interfaces = allInterfaces |
| } |
| |
| transportInstances := make(chan transports.TransportInstance) |
| wg := &sync.WaitGroup{} |
| // Iterate over all network devices of this system. |
| for _, netInterface := range interfaces { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| addrs, err := netInterface.Addrs() |
| if err != nil { |
| return err |
| } |
| wg.Go(func() { |
| defer func() { |
| if err := recover(); err != nil { |
| d.log.Error(). |
| Str("stack", string(debug.Stack())). |
| Interface("err", err). |
| Msg("panic-ed") |
| } |
| }() |
| // Iterate over all addresses the current interface has configured |
| // For KNX we're only interested in IPv4 addresses, as it doesn't |
| // seem to work with IPv6. |
| for _, addr := range addrs { |
| if err := ctx.Err(); err != nil { |
| d.log.Debug().Err(err).Msg("done") |
| return |
| } |
| var ipv4Addr net.IP |
| switch addr.(type) { |
| // If the device is configured to communicate with a subnet |
| case *net.IPNet: |
| ipv4Addr = addr.(*net.IPNet).IP.To4() |
| |
| // If the device is configured for a point-to-point connection |
| case *net.IPAddr: |
| ipv4Addr = addr.(*net.IPAddr).IP.To4() |
| } |
| |
| // If we found an IPv4 address and this is not a loopback address, |
| // add it to the list of devices we will open ports and send discovery |
| // messages from. |
| if ipv4Addr == nil || ipv4Addr.IsLoopback() { |
| continue |
| } |
| d.transportInstanceCreationQueue.Submit(ctx, d.transportInstanceCreationWorkItemId.Add(1), d.createTransportInstanceDispatcher(ctx, wg, connectionUrl, ipv4Addr, udpTransport, transportInstances)) |
| } |
| }) |
| } |
| d.wg.Go(func() { |
| wg.Wait() |
| d.log.Trace().Msg("Closing transport instance channel") |
| close(transportInstances) |
| }) |
| |
| d.wg.Go(func() { |
| defer func() { |
| if err := recover(); err != nil { |
| d.log.Error(). |
| Str("stack", string(debug.Stack())). |
| Interface("err", err). |
| Msg("panic-ed") |
| } |
| }() |
| for transportInstance := range transportInstances { |
| if transportInstance == nil { |
| d.log.Trace().Msg("channel closed") |
| break |
| } |
| d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(ctx, transportInstance.(*udp.TransportInstance), callback)) |
| } |
| }) |
| return nil |
| } |
| |
| func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport *udp.Transport, transportInstances chan transports.TransportInstance) pool.Runnable { |
| wg.Add(1) |
| return func(workerCtx context.Context) { |
| ctx, cancel := context.WithCancel(ctx) |
| context.AfterFunc(workerCtx, cancel) |
| defer wg.Done() |
| // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address) |
| transportInstance, err := |
| udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil, |
| &net.UDPAddr{IP: ipv4Addr, Port: 0}) |
| if err != nil { |
| d.log.Error().Err(err).Msg("error creating transport instance") |
| return |
| } |
| err = transportInstance.Connect(ctx) |
| if err != nil { |
| d.log.Debug().Err(err).Msg("Error Connecting") |
| return |
| } |
| d.log.Debug().Interface("transportInstance", transportInstance).Msg("Adding transport instance to scan %v") |
| transportInstances <- transportInstance |
| } |
| } |
| |
| func (d *Discoverer) createDeviceScanDispatcher(ctx context.Context, udpTransportInstance *udp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) pool.Runnable { |
| return func(workerCtx context.Context) { |
| ctx, cancel := context.WithCancel(ctx) |
| context.AfterFunc(workerCtx, cancel) |
| d.log.Debug().Interface("udpTransportInstance", udpTransportInstance).Msg("Scanning") |
| // Create a codec for sending and receiving messages. |
| codec := NewMessageCodec( |
| udpTransportInstance, |
| nil, |
| append(d._options, options.WithCustomLogger(d.log))..., |
| ) |
| // Explicitly start the worker |
| if err := codec.Connect(ctx); err != nil { |
| d.log.Error().Err(err).Msg("Error connecting") |
| return |
| } |
| |
| localAddress := udpTransportInstance.LocalAddress |
| localAddr := driverModel.NewIPAddress(localAddress.IP) |
| |
| // Prepare the discovery packet data |
| discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint( |
| driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port)) |
| searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint) |
| // Send the search request. |
| if err := codec.Send(ctx, "device_scan_search_request", searchRequestMessage); err != nil { |
| d.log.Debug().Err(err).Interface("searchRequestMessage", searchRequestMessage).Msg("Error sending message") |
| return |
| } |
| // Keep on reading responses till the timeout is done. |
| // TODO: Make this configurable |
| timeout := time.NewTimer(1 * time.Second) |
| timeout.Stop() |
| for start := time.Now(); time.Since(start) < time.Second*5; { |
| if err := ctx.Err(); err != nil { |
| d.log.Debug().Err(err).Msg("done") |
| return |
| } |
| timeout.Reset(1 * time.Second) |
| select { |
| case message := <-codec.GetDefaultIncomingMessageChannel(): |
| { |
| if !timeout.Stop() { |
| <-timeout.C |
| } |
| searchResponse := message.(driverModel.SearchResponse) |
| if searchResponse != nil { |
| addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr() |
| remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d", |
| addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort())) |
| if err != nil { |
| continue |
| } |
| deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00")) |
| discoveryEvent := spiModel.NewDefaultPlcDiscoveryItem( |
| "knxnet-ip", |
| "udp", |
| *remoteUrl, |
| nil, |
| deviceName, |
| nil, |
| ) |
| // Pass the event back to the callback |
| callback(discoveryEvent) |
| } |
| continue |
| } |
| case <-timeout.C: |
| { |
| timeout.Stop() |
| continue |
| } |
| } |
| } |
| } |
| } |
| |
| func (d *Discoverer) Close() error { |
| defer utils.StopWarn(d.log)() |
| d.log.Trace().Msg("Closing discoverer") |
| var collectedErrors []error |
| d.log.Trace().Msg("Closing transport instance creation queue") |
| if err := d.transportInstanceCreationQueue.Close(); err != nil { |
| collectedErrors = append(collectedErrors, errors.Wrap(err, "error closing transport instance creation queue")) |
| } |
| d.log.Trace().Msg("Closing device scanning queue") |
| if err := d.deviceScanningQueue.Close(); err != nil { |
| collectedErrors = append(collectedErrors, errors.Wrap(err, "error closing device scanning queue")) |
| } |
| d.log.Trace().Msg("waiting for wait group") |
| d.wg.Wait() |
| if err := stdErrors.Join(collectedErrors...); err != nil { |
| return errors.Wrap(err, "error closing discoverer") |
| } |
| return nil |
| } |