blob: 07e8fc5612941a23114ed64835854d7c8d90f4d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package bacnetip
import (
"fmt"
"net"
"time"
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/libp2p/go-reuseport"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
type UDPActor struct {
director *UDPDirector
timeout uint32
timer *OneShotFunctionTask
peer string
}
func NewUDPActor(director *UDPDirector, peer string) *UDPActor {
a := &UDPActor{}
// keep track of the director
a.director = director
// associated with a peer
a.peer = peer
// Add a timer
a.timeout = director.timeout
if a.timeout > 0 {
a.timer = FunctionTask(a.idleTimeout)
when := time.Now().Add(time.Duration(a.timeout) * time.Millisecond)
a.timer.InstallTask(&when, nil)
}
// tell the director this is a new actor
a.director.AddActor(a)
return a
}
func (a *UDPActor) idleTimeout() error {
log.Debug().Msg("idleTimeout")
// tell the director this is gone
a.director.DelActor(a)
return nil
}
func (a *UDPActor) Indication(pdu _PDU) error {
log.Debug().Msgf("Indication %s", pdu)
// reschedule the timer
if a.timer != nil {
when := time.Now().Add(time.Duration(a.timeout) * time.Millisecond)
a.timer.InstallTask(&when, nil)
}
// put it in the outbound queue for the director
a.director.request <- pdu
return nil
}
func (a *UDPActor) Response(pdu _PDU) error {
log.Debug().Msgf("Response %s", pdu)
// reschedule the timer
if a.timer != nil {
when := time.Now().Add(time.Duration(a.timeout) * time.Millisecond)
a.timer.InstallTask(&when, nil)
}
// process this as a response from the director
return a.director.Response(pdu)
}
func (a *UDPActor) HandleError(err error) {
log.Debug().Err(err).Msg("HandleError")
if err != nil {
a.director.ActorError(err)
}
}
type UDPDirector struct {
*Server
*ServiceAccessPoint
timeout uint32
reuse bool
address AddressTuple[string, uint16]
udpConn *net.UDPConn
actorClass func(*UDPDirector, string) *UDPActor
request chan _PDU
peers map[string]*UDPActor
running bool
}
func NewUDPDirector(address AddressTuple[string, uint16], timeout *int, reuse *bool, sid *int, sapID *int) (*UDPDirector, error) {
d := &UDPDirector{}
var err error
d.Server, err = NewServer(sid, d)
if err != nil {
return nil, errors.Wrap(err, "error creating server")
}
d.ServiceAccessPoint, err = NewServiceAccessPoint(sapID, d)
if err != nil {
return nil, errors.Wrap(err, "error creating service access point")
}
// check the actor class
d.actorClass = NewUDPActor
// save the timeout for actors
if timeout != nil {
d.timeout = uint32(*timeout)
}
if reuse != nil {
d.reuse = *reuse
}
// save the address
d.address = address
// ask the dispatcher for a socket
resolvedAddress, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", address.Left, address.Right))
if err != nil {
return nil, errors.Wrap(err, "error resolving udp address")
}
if d.reuse {
if packetConn, err := reuseport.ListenPacket("udp", resolvedAddress.String()); err != nil {
return nil, errors.Wrap(err, "error connecting to local address")
} else {
d.udpConn = packetConn.(*net.UDPConn)
}
} else {
if d.udpConn, err = net.ListenUDP("udp", resolvedAddress); err != nil {
return nil, errors.Wrap(err, "error connecting to local address")
}
}
d.running = true
go func() {
for d.running {
d.handleRead()
}
}()
// create the request queue
d.request = make(chan _PDU)
go func() {
for d.running {
pdu := <-d.request
serialize, err := pdu.GetMessage().Serialize()
if err != nil {
log.Error().Err(err).Msg("Error building message")
continue
}
// TODO: wonky address object
destination := pdu.GetPDUDestination()
addr := net.IPv4(destination.AddrAddress[0], destination.AddrAddress[1], destination.AddrAddress[2], destination.AddrAddress[3])
udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, *destination.AddrPort))
if err != nil {
log.Error().Err(err).Msg("Error resolving address")
continue
}
writtenBytes, err := d.udpConn.WriteToUDP(serialize, udpAddr)
if err != nil {
log.Error().Err(err).Msg("Error writing bytes")
continue
}
log.Debug().Msgf("%d written bytes", writtenBytes)
}
}()
// start with an empty peer pool
d.peers = map[string]*UDPActor{}
return d, nil
}
// AddActor adds an actor when a new one is connected
func (d *UDPDirector) AddActor(actor *UDPActor) {
log.Debug().Msgf("AddActor %v", actor)
d.peers[actor.peer] = actor
// tell the ASE there is a new client
if d.serviceElement != nil {
// TODO: not sure how to realize that
//d.SapRequest(actor)
}
}
// DelActor removes an actor when the socket is closed.
func (d *UDPDirector) DelActor(actor *UDPActor) {
log.Debug().Msgf("DelActor %v", actor)
delete(d.peers, actor.peer)
// tell the ASE the client has gone away
if d.serviceElement != nil {
// TODO: not sure how to realize that
//d.SapRequest(actor)
}
}
func (d *UDPDirector) GetActor(address Address) *UDPActor {
return d.peers[address.String()]
}
func (d *UDPDirector) ActorError(err error) {
// tell the ASE the actor had an error
if d.serviceElement != nil {
// TODO: not sure how to realize that
//d.SapRequest(actor, err)
}
}
func (d *UDPDirector) Close() error {
d.running = false
return d.udpConn.Close()
}
func (d *UDPDirector) handleRead() {
log.Debug().Msgf("handleRead(%v)", d.address)
readBytes := make([]byte, 1500) // TODO: check if that is sufficient
var sourceAddr *net.UDPAddr
if _, addr, err := d.udpConn.ReadFromUDP(readBytes); err != nil {
log.Error().Err(err).Msg("error reading")
return
} else {
sourceAddr = addr
}
bvlc, err := model.BVLCParse(readBytes)
if err != nil {
// pass along to a handler
d.handleError(errors.Wrap(err, "error parsing bvlc"))
return
}
saddr, err := NewAddress(sourceAddr)
if err != nil {
// pass along to a handler
d.handleError(errors.Wrap(err, "error parsing source address"))
return
}
daddr, err := NewAddress(d.udpConn.LocalAddr())
if err != nil {
// pass along to a handler
d.handleError(errors.Wrap(err, "error parsing destination address"))
return
}
pdu := NewPDU(bvlc, WithPDUSource(saddr), WithPDUDestination(daddr))
// send the PDU up to the client
go func() {
if err := d._response(pdu); err != nil {
log.Debug().Err(err).Msg("errored")
}
}()
}
func (d *UDPDirector) handleError(err error) {
log.Debug().Err(err).Msg("handleError")
}
// Indication Client requests are queued for delivery.
func (d *UDPDirector) Indication(pdu _PDU) error {
log.Debug().Msgf("Indication %s", pdu)
// get the destination
addr := pdu.GetPDUDestination()
// get the peer
peer, ok := d.peers[addr.String()]
if !ok {
peer = d.actorClass(d, (*addr).String())
}
// send the message
return peer.Indication(pdu)
}
// _response Incoming datagrams are routed through an actor.
func (d *UDPDirector) _response(pdu _PDU) error {
log.Debug().Msgf("_response %s", pdu)
// get the destination
addr := pdu.GetPDUSource()
// get the peer
peer, ok := d.peers[addr.String()]
if !ok {
peer = d.actorClass(d, addr.String())
}
// send the message
return peer.Response(pdu)
}