blob: 3eca22f23734c2ba28facc39ba39cc69cbe5617a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package mtech_lora
import (
"bytes"
"encoding/base64"
"encoding/binary"
"fmt"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/joaojeronimo/go-crc16"
"github.com/runtimeco/go-coap"
"github.com/ugorji/go/codec"
"mynewt.apache.org/newtmgr/nmxact/lora"
"mynewt.apache.org/newtmgr/nmxact/mgmt"
"mynewt.apache.org/newtmgr/nmxact/nmcoap"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/omp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
type LoraSesn struct {
cfg sesn.SesnCfg
txvr *mgmt.Transceiver
isOpen bool
mtu int
xport *LoraXport
msgListener *Listener
reassListener *Listener
tgtListener *Listener
wg sync.WaitGroup
stopChan chan struct{}
txFilterCb nmcoap.MsgFilter
rxFilterCb nmcoap.MsgFilter
}
type mtechLoraTx struct {
Port uint8 `json:"port"`
Data string `json:"data"`
Ack bool `json:"ack"`
}
func NewLoraSesn(cfg sesn.SesnCfg, lx *LoraXport) (*LoraSesn, error) {
if cfg.Lora.Addr != "" || cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
// Allow server to bind to wildcard address
addr, err := NormalizeAddr(cfg.Lora.Addr)
if err != nil {
return nil, fmt.Errorf("Invalid Lora address %s\n", cfg.Lora.Addr)
}
cfg.Lora.Addr = addr
}
if cfg.Lora.Port < 1 || cfg.Lora.Port > 223 {
return nil, fmt.Errorf("Invalid Lora Port %d\n", cfg.Lora.Addr)
}
s := &LoraSesn{
cfg: cfg,
xport: lx,
mtu: 0,
txFilterCb: cfg.TxFilterCb,
rxFilterCb: cfg.RxFilterCb,
}
return s, nil
}
func (s *LoraSesn) Open() error {
if s.isOpen == true {
return nmxutil.NewSesnAlreadyOpenError(
"Attempt to open an already-open Lora session")
}
txFilterCb, rxFilterCb := s.Filters()
txvr, err := mgmt.NewTransceiver(txFilterCb, rxFilterCb, false,
s.cfg.MgmtProto, 3)
if err != nil {
return err
}
s.txvr = txvr
s.stopChan = make(chan struct{})
msgType := "rsp"
if s.cfg.MgmtProto == sesn.MGMT_PROTO_COAP_SERVER {
msgType = "req"
}
s.xport.Lock()
msgKey := TgtPortKey(s.cfg.Lora.Addr, s.cfg.Lora.Port, msgType)
s.msgListener = NewListener()
err = s.xport.msgMap.AddListener(msgKey, s.msgListener)
if err != nil {
s.xport.Unlock()
s.txvr.Stop()
return err
}
if s.cfg.Lora.Addr != "" {
msgKey, l := s.xport.reassMap.FindListener(s.cfg.Lora.Addr,
s.cfg.Lora.Port, "reass")
if l != nil {
l.RefCnt++
} else {
msgKey = TgtPortKey(s.cfg.Lora.Addr, s.cfg.Lora.Port, "reass")
l = NewListener()
s.xport.reassMap.AddListener(msgKey, l)
}
s.reassListener = l
tgtKey := TgtKey(s.cfg.Lora.Addr, "rx")
s.tgtListener = NewListener()
err = s.xport.tgtMap.AddListener(tgtKey, s.tgtListener)
if err != nil {
s.xport.Unlock()
s.closeListeners()
s.txvr.Stop()
return err
}
}
s.xport.Unlock()
if s.cfg.MgmtProto == sesn.MGMT_PROTO_COAP_SERVER {
s.isOpen = true
return nil
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.closeListeners()
for {
select {
case msg, ok := <-s.msgListener.MsgChan:
if ok {
s.txvr.DispatchCoap(msg)
}
case mtu, ok := <-s.tgtListener.MtuChan:
if ok {
if s.mtu != mtu {
log.Debugf("Setting mtu for %s %d",
s.cfg.Lora.Addr, mtu)
}
s.mtu = mtu
}
case <-s.stopChan:
return
}
}
}()
s.isOpen = true
return nil
}
func (s *LoraSesn) closeListeners() {
s.xport.Lock()
s.xport.msgMap.RemoveListener(s.msgListener)
if s.tgtListener != nil {
s.xport.tgtMap.RemoveListener(s.tgtListener)
}
if s.reassListener != nil {
s.reassListener.Close()
if s.reassListener.RefCnt == 0 {
s.xport.reassMap.RemoveListener(s.reassListener)
}
}
s.xport.Unlock()
}
func (s *LoraSesn) Close() error {
if s.isOpen == false {
return nmxutil.NewSesnClosedError(
"Attempt to close an unopened Lora session")
}
s.isOpen = false
s.txvr.ErrorAll(fmt.Errorf("manual close"))
s.txvr.Stop()
close(s.stopChan)
s.msgListener.Close()
if s.tgtListener != nil {
s.tgtListener.Close()
}
s.wg.Wait()
if s.cfg.Lora.Addr != "" {
s.xport.Tx([]byte(fmt.Sprintf("lora/%s/flush",
DenormalizeAddr(s.cfg.Lora.Addr))))
}
s.stopChan = nil
s.txvr = nil
if s.cfg.MgmtProto == sesn.MGMT_PROTO_COAP_SERVER {
s.closeListeners()
}
return nil
}
func (s *LoraSesn) Mtu() int {
if s.cfg.Lora.SegSz != 0 {
return s.cfg.Lora.SegSz
}
if s.mtu != 0 {
return s.mtu
}
return s.xport.minMtu()
}
func (s *LoraSesn) IsOpen() bool {
return s.isOpen
}
func (s *LoraSesn) MtuIn() int {
return MAX_PACKET_SIZE_IN - omp.OMP_MSG_OVERHEAD - nmp.NMP_HDR_SIZE
}
func (s *LoraSesn) MtuOut() int {
// We want image upload to use chunk size which fits inside a single
// lora segment, when possible. If the datarate is low enough, then we have
// to fragment, but try to avoid it if possible.
mtu := MAX_PACKET_SIZE_OUT
if s.mtu > mtu {
mtu = s.mtu
}
return mtu - omp.OMP_MSG_OVERHEAD - nmp.NMP_HDR_SIZE
}
func (s *LoraSesn) sendFragments(b []byte) error {
segSz := s.Mtu()
crc := crc16.Crc16(b)
idx := 0
for off := 0; off < len(b); {
var seg bytes.Buffer
var blkLen int
if off == 0 {
hdr := lora.CoapLoraFragStart{
FragNum: 0,
Crc: crc,
}
blkLen = segSz - 4
if blkLen >= len(b) {
blkLen = len(b)
hdr.FragNum |= lora.COAP_LORA_LAST_FRAG
}
binary.Write(&seg, binary.LittleEndian, hdr)
seg.Write(b[0:blkLen])
} else {
hdr := lora.CoapLoraFrag{
FragNum: uint8(idx),
}
blkLen = segSz - 1
if blkLen >= len(b)-off {
blkLen = len(b) - off
hdr.FragNum |= lora.COAP_LORA_LAST_FRAG
}
binary.Write(&seg, binary.LittleEndian, hdr)
seg.Write(b[off : off+blkLen])
}
off += blkLen
idx++
seg64 := make([]byte, base64.StdEncoding.EncodedLen(len(seg.Bytes())))
base64.StdEncoding.Encode(seg64, seg.Bytes())
msg := mtechLoraTx{
Port: s.cfg.Lora.Port,
Ack: s.cfg.Lora.ConfirmedTx,
Data: string(seg64),
}
payload := []byte{}
enc := codec.NewEncoderBytes(&payload, new(codec.JsonHandle))
enc.Encode(msg)
var outData bytes.Buffer
outData.Write([]byte(fmt.Sprintf("lora/%s/down %s",
DenormalizeAddr(s.cfg.Lora.Addr), payload)))
err := s.xport.Tx(outData.Bytes())
if err != nil {
return err
}
}
return nil
}
func (s *LoraSesn) TxNmpOnce(m *nmp.NmpMsg, opt sesn.TxOptions) (
nmp.NmpRsp, error) {
if !s.IsOpen() {
return nil, nmxutil.NewSesnClosedError(
"Attempt to transmit over closed Lora session")
}
txFunc := func(b []byte) error {
return s.sendFragments(b)
}
return s.txvr.TxNmp(txFunc, m, s.MtuOut(), opt.Timeout)
}
func (s *LoraSesn) AbortRx(seq uint8) error {
s.txvr.ErrorAll(fmt.Errorf("Rx aborted"))
return nil
}
func (s *LoraSesn) TxCoapOnce(m coap.Message, resType sesn.ResourceType,
opt sesn.TxOptions) (coap.COAPCode, []byte, error) {
if !s.IsOpen() {
return 0, nil, nmxutil.NewSesnClosedError(
"Attempt to transmit over closed Lora session")
}
txFunc := func(b []byte) error {
return s.sendFragments(b)
}
rsp, err := s.txvr.TxOic(txFunc, m, s.MtuOut(), opt.Timeout)
if err != nil {
return 0, nil, err
} else if rsp == nil {
return 0, nil, nil
} else {
return rsp.Code(), rsp.Payload(), nil
}
}
func (s *LoraSesn) TxCoapObserve(m coap.Message, resType sesn.ResourceType,
opt sesn.TxOptions, NotifCb sesn.GetNotifyCb, stopsignal chan int) (coap.COAPCode, []byte, []byte, error) {
return 0, nil, nil, nil
}
func (s *LoraSesn) MgmtProto() sesn.MgmtProto {
return s.cfg.MgmtProto
}
func (s *LoraSesn) CoapIsTcp() bool {
return false
}
func (s *LoraSesn) RxAccept() (sesn.Sesn, *sesn.SesnCfg, error) {
if !s.isOpen {
return nil, nil, nmxutil.NewSesnClosedError(
"Attempt to listen for data from closed connection")
}
if s.cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
return nil, nil, fmt.Errorf("Invalid operation for %s", s.cfg.MgmtProto)
}
if s.tgtListener != nil {
return nil, nil, fmt.Errorf("RxAccept() only wildcard destinations")
}
s.wg.Add(1)
defer s.wg.Done()
for {
select {
case cl_s, ok := <-s.msgListener.ConnChan:
if !ok {
continue
}
cl_s.cfg.Lora.ConfirmedTx = s.cfg.Lora.ConfirmedTx
cl_s.cfg.Lora.SegSz = s.cfg.Lora.SegSz
cl_s.cfg.TxFilterCb = s.cfg.TxFilterCb
cl_s.cfg.RxFilterCb = s.cfg.RxFilterCb
return cl_s, &cl_s.cfg, nil
case <-s.stopChan:
return nil, nil, fmt.Errorf("Session closed")
}
}
}
func (s *LoraSesn) RxCoap(opt sesn.TxOptions) (coap.Message, error) {
if !s.isOpen {
return nil, nmxutil.NewSesnClosedError(
"Attempt to listen for data from closed connection")
}
if s.cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
return nil, fmt.Errorf("Invalid operation for %s", s.cfg.MgmtProto)
}
if s.tgtListener == nil {
return nil, fmt.Errorf("RxCoap() only connected sessions")
}
waitTmoChan := time.After(opt.Timeout)
s.wg.Add(1)
defer s.wg.Done()
for {
select {
case data, ok := <-s.msgListener.MsgChan:
if !ok {
continue
}
msg, err := s.txvr.ProcessCoapReq(data)
if err != nil {
return nil, err
}
if msg != nil {
return msg, nil
}
case mtu, ok := <-s.tgtListener.MtuChan:
if ok {
if s.mtu != mtu {
log.Debugf("Setting mtu for %s %d",
s.cfg.Lora.Addr, mtu)
}
s.mtu = mtu
}
case _, ok := <-waitTmoChan:
waitTmoChan = nil
if ok {
return nil, nmxutil.NewRspTimeoutError(
"RxCoap() timed out")
}
case <-s.stopChan:
return nil, fmt.Errorf("Session closed")
}
}
}
func (s *LoraSesn) Filters() (nmcoap.MsgFilter, nmcoap.MsgFilter) {
return s.txFilterCb, s.rxFilterCb
}