blob: cab7a79b530f16dcacec09142d3d1c5f767d92a4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package omp
import (
"fmt"
"sync"
"github.com/runtimeco/go-coap"
"mynewt.apache.org/newtmgr/nmxact/nmcoap"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/nmxutil"
)
type Listener struct {
nmpl *nmp.Listener
coapl *nmcoap.Listener
stopCh chan struct{}
}
// The dispatcher is the owner of the listeners it points to. Only the
// dispatcher writes to these listeners.
type Dispatcher struct {
seqListenerMap map[uint8]*Listener
coapd *nmcoap.Dispatcher
wg sync.WaitGroup
rxFilter nmcoap.MsgFilter
stopped bool
logDepth int
mtx sync.Mutex
}
func NewDispatcher(rxFilter nmcoap.MsgFilter, isTcp bool,
logDepth int) (*Dispatcher, error) {
d := &Dispatcher{
seqListenerMap: map[uint8]*Listener{},
coapd: nmcoap.NewDispatcher(isTcp, logDepth+1),
rxFilter: rxFilter,
logDepth: logDepth + 2,
}
return d, nil
}
func (d *Dispatcher) addOmpListener(seq uint8) (*Listener, error) {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.seqListenerMap[seq] != nil {
return nil, fmt.Errorf("duplicate OMP listener; seq=%d", seq)
}
mc := nmcoap.MsgCriteria{
Token: nmxutil.SeqToToken(seq),
Path: "",
}
ol, err := d.AddCoapListener(mc)
if err != nil {
return nil, err
}
ompl := &Listener{
nmpl: nmp.NewListener(),
coapl: ol,
stopCh: make(chan struct{}),
}
d.seqListenerMap[seq] = ompl
d.wg.Add(1)
go func() {
defer d.RemoveCoapListener(mc)
defer d.wg.Done()
// Listen for events. All feedback is sent to the client via the NMP
// listener channels. It is done this way so that client code can be
// OMP/NMP agnostic.
for {
select {
case m := <-ompl.coapl.RspChan:
rsp, err := DecodeOmp(m, d.rxFilter)
if err != nil {
ompl.nmpl.ErrChan <- err
} else if rsp != nil {
ompl.nmpl.RspChan <- rsp
} else {
/* no error, no response */
}
case err := <-ompl.coapl.ErrChan:
if err != nil {
ompl.nmpl.ErrChan <- err
}
case <-ompl.stopCh:
return
}
}
}()
return ompl, nil
}
func (d *Dispatcher) Stop() {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.stopped {
return
}
d.stopped = true
for seq, ompl := range d.seqListenerMap {
delete(d.seqListenerMap, seq)
close(ompl.stopCh)
}
d.wg.Wait()
}
func (d *Dispatcher) Dispatch(data []byte) bool {
return d.coapd.Dispatch(data)
}
func (d *Dispatcher) ProcessCoapReq(data []byte) (coap.Message, error) {
return d.coapd.ProcessCoapReq(data)
}
func (d *Dispatcher) AddCoapListener(
mc nmcoap.MsgCriteria) (*nmcoap.Listener, error) {
return d.coapd.AddListener(mc)
}
func (d *Dispatcher) RemoveCoapListener(
mc nmcoap.MsgCriteria) *nmcoap.Listener {
return d.coapd.RemoveListener(mc)
}
func (d *Dispatcher) AddNmpListener(seq uint8) (*nmp.Listener, error) {
ompl, err := d.addOmpListener(seq)
if err != nil {
return nil, err
}
nmxutil.LogAddNmpListener(d.logDepth, seq)
return ompl.nmpl, nil
}
func (d *Dispatcher) RemoveNmpListener(seq uint8) *nmp.Listener {
d.mtx.Lock()
defer d.mtx.Unlock()
ompl := d.seqListenerMap[seq]
if ompl == nil {
return nil
}
delete(d.seqListenerMap, seq)
close(ompl.stopCh)
nmxutil.LogRemoveNmpListener(d.logDepth, seq)
return ompl.nmpl
}
func (d *Dispatcher) ErrorOneNmp(seq uint8, err error) error {
d.mtx.Lock()
defer d.mtx.Unlock()
ompl := d.seqListenerMap[seq]
if ompl == nil {
return fmt.Errorf("no nmp listener for seq %d", seq)
}
ompl.nmpl.ErrChan <- err
return nil
}
func (d *Dispatcher) ErrorAll(err error) {
d.coapd.ErrorAll(err)
}
func (d *Dispatcher) SetRxFilter(rxFilter nmcoap.MsgFilter) {
d.rxFilter = rxFilter
}
func (d *Dispatcher) RxFilter() nmcoap.MsgFilter {
return d.rxFilter
}