blob: 4655a589af677a4d5dc1ced8f47a772312bbec9e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package mgmt
import (
"encoding/hex"
"fmt"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/runtimeco/go-coap"
"mynewt.apache.org/newtmgr/nmxact/nmcoap"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/omp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
type TxFn func(req []byte) error
type Transceiver struct {
// Only for plain NMP; nil for OMP transceivers.
nd *nmp.Dispatcher
// Used for OMP and CoAP resource requests.
od *omp.Dispatcher
isTcp bool
proto sesn.MgmtProto
wg sync.WaitGroup
}
func NewTransceiver(txFilterCb, rxFilterCb nmcoap.MsgFilter, isTcp bool, mgmtProto sesn.MgmtProto, logDepth int) (
*Transceiver, error) {
t := &Transceiver{
isTcp: isTcp,
proto: mgmtProto,
}
if mgmtProto == sesn.MGMT_PROTO_NMP {
t.nd = nmp.NewDispatcher(logDepth)
}
od, err := omp.NewDispatcher(txFilterCb, rxFilterCb, isTcp, logDepth)
if err != nil {
return nil, err
}
t.od = od
return t, nil
}
func (t *Transceiver) txPlain(txCb TxFn, req *nmp.NmpMsg, mtu int,
timeout time.Duration) (nmp.NmpRsp, error) {
nl, err := t.nd.AddListener(req.Hdr.Seq)
if err != nil {
return nil, err
}
defer t.nd.RemoveListener(req.Hdr.Seq)
b, err := nmp.EncodeNmpPlain(req)
if err != nil {
return nil, err
}
log.Debugf("Tx NMP request: %s", hex.Dump(b))
if t.isTcp == false && len(b) > mtu {
return nil, fmt.Errorf("Request too big")
}
frags := nmxutil.Fragment(b, mtu)
for _, frag := range frags {
if err := txCb(frag); err != nil {
return nil, err
}
}
// Now wait for NMP response.
for {
select {
case err := <-nl.ErrChan:
return nil, err
case rsp := <-nl.RspChan:
return rsp, nil
case _, ok := <-nl.AfterTimeout(timeout):
if ok {
return nil, nmxutil.NewRspTimeoutError("NMP timeout")
}
}
}
}
func (t *Transceiver) txOmp(txCb TxFn, req *nmp.NmpMsg, mtu int,
timeout time.Duration) (nmp.NmpRsp, error) {
nl, err := t.od.AddNmpListener(req.Hdr.Seq)
if err != nil {
return nil, err
}
defer t.od.RemoveNmpListener(req.Hdr.Seq)
var b []byte
txFilterCb, _ := t.od.Filters()
if t.isTcp {
b, err = omp.EncodeOmpTcp(txFilterCb, req)
} else {
b, err = omp.EncodeOmpDgram(txFilterCb, req)
}
if err != nil {
return nil, err
}
log.Debugf("Tx OMP request: %s", hex.Dump(b))
if t.isTcp == false && len(b) > mtu {
return nil, fmt.Errorf("Request too big")
}
frags := nmxutil.Fragment(b, mtu)
for _, frag := range frags {
if err := txCb(frag); err != nil {
return nil, err
}
}
// Now wait for NMP response.
for {
select {
case err := <-nl.ErrChan:
return nil, err
case rsp := <-nl.RspChan:
return rsp, nil
case _, ok := <-nl.AfterTimeout(timeout):
if ok {
return nil, nmxutil.NewRspTimeoutError("NMP timeout")
}
}
}
}
func (t *Transceiver) TxNmp(txCb TxFn, req *nmp.NmpMsg, mtu int,
timeout time.Duration) (nmp.NmpRsp, error) {
if t.nd != nil {
return t.txPlain(txCb, req, mtu, timeout)
} else {
return t.txOmp(txCb, req, mtu, timeout)
}
}
func (t *Transceiver) TxOic(txCb TxFn, req coap.Message, mtu int,
timeout time.Duration) (coap.Message, error) {
b, err := nmcoap.Encode(req)
if err != nil {
return nil, err
}
var rspExpected bool
switch req.Type() {
case coap.Confirmable:
rspExpected = true
case coap.NonConfirmable:
rspExpected = req.Code() == coap.GET
default:
rspExpected = false
}
if t.proto == sesn.MGMT_PROTO_COAP_SERVER {
rspExpected = false
}
var ol *nmcoap.Listener
if rspExpected {
ol, err = t.od.AddOicListener(req.Token())
if err != nil {
return nil, err
}
defer t.od.RemoveOicListener(req.Token())
}
log.Debugf("Tx OIC request: %s", hex.Dump(b))
frags := nmxutil.Fragment(b, mtu)
for _, frag := range frags {
if err := txCb(frag); err != nil {
return nil, err
}
}
if !rspExpected {
return nil, nil
}
for {
select {
case err := <-ol.ErrChan:
return nil, err
case rsp := <-ol.RspChan:
return rsp, nil
case _, ok := <-ol.AfterTimeout(timeout):
if ok {
return nil, nmxutil.NewRspTimeoutError("OIC timeout")
}
}
}
}
func (t *Transceiver) TxOicObserve(txCb TxFn, req coap.Message, mtu int,
timeout time.Duration, NotifyCb sesn.GetNotifyCb, stopsignal chan int) (coap.Message, error) {
b, err := nmcoap.Encode(req)
if err != nil {
return nil, err
}
var ol *nmcoap.Listener
ol, err = t.od.AddOicListener(req.Token())
if err != nil {
return nil, err
}
log.Debugf("Tx OIC request: %s", hex.Dump(b))
frags := nmxutil.Fragment(b, mtu)
for _, frag := range frags {
if err := txCb(frag); err != nil {
t.od.RemoveOicListener(req.Token())
return nil, err
}
}
var rsp coap.Message
first := make(chan int)
iter := 0
go func() {
defer t.od.RemoveOicListener(req.Token())
for {
select {
case err = <-ol.ErrChan:
log.Debugf("Error: %s", err)
first <- 2
return
case rsp = <-ol.RspChan:
if iter == 0 {
first <- 1
iter = 1
} else {
NotifyCb(req.PathString(), rsp.Code(), rsp.Payload(), rsp.Token())
}
case _, ok := <-ol.AfterTimeout(timeout):
if ok && iter == 0 {
err = nmxutil.NewRspTimeoutError("OIC timeout")
first <- 2
return
} else {
log.Debugf("Timeout")
}
case <-stopsignal:
/* Observing stopped by user */
return
}
}
}()
for {
select {
case a := <-first:
if a == 1 {
return rsp, nil
} else {
return nil, err
}
}
}
}
func (t *Transceiver) DispatchNmpRsp(data []byte) {
if t.nd != nil {
t.nd.Dispatch(data)
} else {
log.Debugf("Rx OIC response: %s", hex.Dump(data))
t.od.Dispatch(data)
}
}
func (t *Transceiver) DispatchCoap(data []byte) {
t.od.Dispatch(data)
}
func (t *Transceiver) ProcessCoapReq(data []byte) (coap.Message, error) {
return t.od.ProcessCoapReq(data)
}
func (t *Transceiver) ErrorOne(seq uint8, err error) {
if t.nd != nil {
t.nd.ErrorOne(seq, err)
} else {
t.od.ErrorOneNmp(seq, err)
}
}
func (t *Transceiver) ErrorAll(err error) {
if t.nd != nil {
t.nd.ErrorAll(err)
}
t.od.ErrorAll(err)
}
func (t *Transceiver) AbortRx(seq uint8) {
t.ErrorOne(seq, fmt.Errorf("rx aborted"))
}
func (t *Transceiver) Stop() {
t.od.Stop()
}
func (t *Transceiver) MgmtProto() sesn.MgmtProto {
return t.proto
}