Merge pull request #168 from ccollins476ad/freeze
Freeze filter for `image upload` chunk length calculation
diff --git a/newtmgr/bll/bll_sesn.go b/newtmgr/bll/bll_sesn.go
index 91dc81b..d062140 100644
--- a/newtmgr/bll/bll_sesn.go
+++ b/newtmgr/bll/bll_sesn.go
@@ -392,6 +392,27 @@
return s.txvr.TxRxMgmt(txRaw, m, s.MtuOut(), timeout)
}
+func (s *BllSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+ timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+ if !s.IsOpen() {
+ return nmxutil.NewSesnClosedError(
+ "Attempt to transmit over closed BLE session")
+ }
+
+ if s.nmpReqChr == nil || s.nmpRspChr == nil {
+ return fmt.Errorf("Cannot send NMP request; peer doesn't " +
+ "support request or response characteristic")
+ }
+
+ txRaw := func(b []byte) error {
+ return s.txWriteCharacteristic(s.nmpReqChr, b, true)
+ }
+
+ s.txvr.TxRxMgmtAsync(txRaw, m, s.MtuOut(), timeout, ch, errc)
+ return nil
+}
+
func (s *BllSesn) TxCoap(m coap.Message) error {
txRaw := func(b []byte) error {
return s.txWriteCharacteristic(s.resReqChr, b, !s.cfg.WriteRsp)
diff --git a/newtmgr/cli/image.go b/newtmgr/cli/image.go
index 620cf0d..3e9bf21 100644
--- a/newtmgr/cli/image.go
+++ b/newtmgr/cli/image.go
@@ -45,6 +45,7 @@
var noerase bool
var upgrade bool
var imageNum int
+var maxWinSz int
func imageFlagsStr(image nmp.ImageStateEntry) string {
strs := []string{}
@@ -200,9 +201,12 @@
c.ProgressBar.SetUnits(pb.U_BYTES)
c.ProgressBar.ShowSpeed = true
c.LastOff = 0
+ c.MaxWinSz = maxWinSz
c.ProgressCb = func(cmd *xact.ImageUploadCmd, rsp *nmp.ImageUploadRsp) {
- c.ProgressBar.Add(int(rsp.Off - c.LastOff))
- c.LastOff = rsp.Off
+ if rsp.Off > c.LastOff {
+ c.ProgressBar.Add(int(rsp.Off - c.LastOff))
+ c.LastOff = rsp.Off
+ }
}
res, err := c.Run(s)
@@ -402,7 +406,7 @@
Run: imageUploadCmd,
}
uploadCmd.PersistentFlags().BoolVarP(&noerase,
- "noerase", "e", false,
+ "noerase", "e", true,
"Don't send specific image erase command to start with")
uploadCmd.PersistentFlags().BoolVarP(&upgrade,
"upgrade", "u", false,
@@ -411,6 +415,10 @@
uploadCmd.PersistentFlags().IntVarP(&imageNum,
"image", "n", 0,
"In a multi-image system, which image should be uploaded")
+ uploadCmd.PersistentFlags().IntVarP(&maxWinSz,
+ "maxwinsize", "w", xact.IMAGE_UPLOAD_DEF_MAX_WS,
+ "Set the maximum size for the window of outstanding chunks in transit. "+
+ "caution:higher num may not translate to better perf and may result in errors")
imageCmd.AddCommand(uploadCmd)
coreListCmd := &cobra.Command{
diff --git a/nmxact/mgmt/transceiver.go b/nmxact/mgmt/transceiver.go
index 3fb1d5b..74ec51a 100644
--- a/nmxact/mgmt/transceiver.go
+++ b/nmxact/mgmt/transceiver.go
@@ -114,6 +114,54 @@
}
}
+func (t *Transceiver) txRxNmpAsync(txCb TxFn, req *nmp.NmpMsg, mtu int,
+ timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+ nl, err := t.nd.AddListener(req.Hdr.Seq)
+ if err != nil {
+ return err
+ }
+
+ b, err := nmp.EncodeNmpPlain(req)
+ if err != nil {
+ return err
+ }
+
+ log.Debugf("Tx NMP async request: seq %d %s", req.Hdr.Seq, hex.Dump(b))
+ if t.isTcp == false && len(b) > mtu {
+ return fmt.Errorf("Request too big")
+ }
+ frags := nmxutil.Fragment(b, mtu)
+ for _, frag := range frags {
+ if err := txCb(frag); err != nil {
+ return err
+ }
+ }
+
+ // Now wait for NMP response.
+ go func() {
+ defer t.nd.RemoveListener(req.Hdr.Seq)
+ for {
+ select {
+ case err := <-nl.ErrChan:
+ errc <- err
+ return
+ case rsp := <-nl.RspChan:
+ ch <- rsp
+ return
+ case _, ok := <-nl.AfterTimeout(timeout):
+ if ok {
+ errc <- nmxutil.NewRspTimeoutError("NMP timeout")
+ return
+ }
+ }
+
+ }
+ }()
+
+ return nil
+}
+
func (t *Transceiver) txRxOmp(txCb TxFn, req *nmp.NmpMsg, mtu int,
timeout time.Duration) (nmp.NmpRsp, error) {
@@ -160,6 +208,62 @@
}
}
+func (t *Transceiver) txRxOmpAsync(txCb TxFn, req *nmp.NmpMsg, mtu int,
+ timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+ seq := req.Hdr.Seq
+ nl, err := t.od.AddNmpListener(seq)
+ if err != nil {
+ return err
+ }
+
+ var b []byte
+ if t.isTcp {
+ b, err = omp.EncodeOmpTcp(t.txFilterCb, req)
+ } else {
+ b, err = omp.EncodeOmpDgram(t.txFilterCb, req)
+ }
+ if err != nil {
+ return err
+ }
+
+ log.Debugf("Tx OMP request: %v %s", seq, hex.Dump(b))
+
+ if t.isTcp == false && len(b) > mtu {
+ return fmt.Errorf("Request too big")
+ }
+ frags := nmxutil.Fragment(b, mtu)
+ for _, frag := range frags {
+ if err := txCb(frag); err != nil {
+ log.Debugf("txCb error %v", err)
+ t.od.RemoveNmpListener(seq)
+ return err
+ }
+ }
+
+ // Now wait for NMP response.
+ go func() {
+ defer t.od.RemoveNmpListener(seq)
+ for {
+ select {
+ case err := <-nl.ErrChan:
+ log.Debugf("Error reported %v seq %v", err, seq)
+ errc <- err
+ return
+ case rsp := <-nl.RspChan:
+ ch <- rsp
+ return
+ case _, ok := <-nl.AfterTimeout(timeout):
+ if ok {
+ errc <- fmt.Errorf("Request timedout")
+ return
+ }
+ }
+ }
+ }()
+ return nil
+}
+
func (t *Transceiver) TxRxMgmt(txCb TxFn, req *nmp.NmpMsg, mtu int,
timeout time.Duration) (nmp.NmpRsp, error) {
@@ -170,6 +274,16 @@
}
}
+func (t *Transceiver) TxRxMgmtAsync(txCb TxFn, req *nmp.NmpMsg, mtu int,
+ timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+ if t.nd != nil {
+ return t.txRxNmpAsync(txCb, req, mtu, timeout, ch, errc)
+ } else {
+ return t.txRxOmpAsync(txCb, req, mtu, timeout, ch, errc)
+ }
+}
+
func (t *Transceiver) TxCoap(txCb TxFn, req coap.Message, mtu int) error {
b, err := nmcoap.Encode(req)
if err != nil {
diff --git a/nmxact/mtech_lora/mtech_lora_sesn.go b/nmxact/mtech_lora/mtech_lora_sesn.go
index 5dc796a..200207f 100644
--- a/nmxact/mtech_lora/mtech_lora_sesn.go
+++ b/nmxact/mtech_lora/mtech_lora_sesn.go
@@ -316,6 +316,18 @@
return s.txvr.TxRxMgmt(txFunc, m, s.MtuOut(), timeout)
}
+func (s *LoraSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+ timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+ rsp, err := s.TxRxMgmt(m, timeout)
+ if err != nil {
+ errc <- err
+ } else {
+ ch <- rsp
+ }
+ return nil
+
+}
+
func (s *LoraSesn) AbortRx(seq uint8) error {
s.txvr.ErrorAll(fmt.Errorf("Rx aborted"))
return nil
diff --git a/nmxact/nmble/ble_sesn.go b/nmxact/nmble/ble_sesn.go
index 866f961..a9212ce 100644
--- a/nmxact/nmble/ble_sesn.go
+++ b/nmxact/nmble/ble_sesn.go
@@ -110,6 +110,12 @@
return s.Ns.TxRxMgmt(m, timeout)
}
+func (s *BleSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+ timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+ return s.Ns.TxRxMgmtAsync(m, timeout, ch, errc)
+}
+
func (s *BleSesn) TxCoap(m coap.Message) error {
return s.Ns.TxCoap(m)
}
diff --git a/nmxact/nmble/naked_sesn.go b/nmxact/nmble/naked_sesn.go
index 9129b05..ad6d4ba 100644
--- a/nmxact/nmble/naked_sesn.go
+++ b/nmxact/nmble/naked_sesn.go
@@ -351,6 +351,11 @@
return rsp, nil
}
+func (s *NakedSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+ timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+ return fmt.Errorf("unsupported")
+}
+
func (s *NakedSesn) ListenCoap(
mc nmcoap.MsgCriteria) (*nmcoap.Listener, error) {
diff --git a/nmxact/nmserial/serial_sesn.go b/nmxact/nmserial/serial_sesn.go
index 46079d8..8bca77c 100644
--- a/nmxact/nmserial/serial_sesn.go
+++ b/nmxact/nmserial/serial_sesn.go
@@ -213,6 +213,18 @@
return s.txvr.TxRxMgmt(txFn, m, s.MtuOut(), timeout)
}
+func (s *SerialSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+ timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+ rsp, err := s.TxRxMgmt(m, timeout)
+ if err != nil {
+ errc <- err
+ } else {
+ ch <- rsp
+ }
+ return nil
+}
+
func (s *SerialSesn) TxCoap(m coap.Message) error {
if !s.isOpen {
return nmxutil.NewSesnClosedError(
diff --git a/nmxact/sesn/sesn.go b/nmxact/sesn/sesn.go
index 1dc0793..f9a40d6 100644
--- a/nmxact/sesn/sesn.go
+++ b/nmxact/sesn/sesn.go
@@ -102,6 +102,7 @@
// * nmxutil.SesnClosedError: session not open.
// * other error
TxRxMgmt(m *nmp.NmpMsg, timeout time.Duration) (nmp.NmpRsp, error)
+ TxRxMgmtAsync(m *nmp.NmpMsg, timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error
// Creates a listener for incoming CoAP messages matching the specified
// criteria.
diff --git a/nmxact/sesn/sesn_util.go b/nmxact/sesn/sesn_util.go
index dea3fcd..13486ed 100644
--- a/nmxact/sesn/sesn_util.go
+++ b/nmxact/sesn/sesn_util.go
@@ -45,6 +45,20 @@
}
}
+func TxRxMgmtAsync(s Sesn, m *nmp.NmpMsg, o TxOptions, ch chan nmp.NmpRsp, errc chan error) error {
+ retries := o.Tries - 1
+ for i := 0; ; i++ {
+ err := s.TxRxMgmtAsync(m, o.Timeout, ch, errc)
+ if err == nil {
+ return nil
+ }
+
+ if !nmxutil.IsRspTimeout(err) || i >= retries {
+ return err
+ }
+ }
+}
+
// TxCoap transmits a single CoAP message over the provided session.
func TxCoap(s Sesn, mp nmcoap.MsgParams) error {
msg, err := nmcoap.CreateMsg(s.CoapIsTcp(), mp)
diff --git a/nmxact/udp/udp_sesn.go b/nmxact/udp/udp_sesn.go
index 6986492..45873c6 100644
--- a/nmxact/udp/udp_sesn.go
+++ b/nmxact/udp/udp_sesn.go
@@ -118,6 +118,11 @@
return s.txvr.TxRxMgmt(txRaw, m, s.MtuOut(), timeout)
}
+func (s *UdpSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+ timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+ return fmt.Errorf("unsupported")
+}
+
func (s *UdpSesn) AbortRx(seq uint8) error {
s.txvr.ErrorAll(fmt.Errorf("Rx aborted"))
return nil
diff --git a/nmxact/xact/image.go b/nmxact/xact/image.go
index bebe33d..ea865b6 100644
--- a/nmxact/xact/image.go
+++ b/nmxact/xact/image.go
@@ -25,10 +25,14 @@
pb "gopkg.in/cheggaaa/pb.v1"
+ log "github.com/sirupsen/logrus"
"mynewt.apache.org/newtmgr/nmxact/mgmt"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/sesn"
+ "sync"
+ "sync/atomic"
+ "time"
)
//////////////////////////////////////////////////////////////////////////////
@@ -36,6 +40,12 @@
//////////////////////////////////////////////////////////////////////////////
const IMAGE_UPLOAD_MAX_CHUNK = 512
const IMAGE_UPLOAD_MIN_1ST_CHUNK = 32
+const IMAGE_UPLOAD_STATUS_MISSED = -1
+const IMAGE_UPLOAD_CHUNK_MISSED_WM = -1
+const IMAGE_UPLOAD_START_WS = 1
+const IMAGE_UPLOAD_DEF_MAX_WS = 5
+const IMAGE_UPLOAD_STATUS_EXPECTED = 0
+const IMAGE_UPLOAD_STATUS_RQ = 1
type ImageUploadProgressFn func(c *ImageUploadCmd, r *nmp.ImageUploadRsp)
type ImageUploadCmd struct {
@@ -45,6 +55,17 @@
Upgrade bool
ProgressCb ImageUploadProgressFn
ImageNum int
+ MaxWinSz int
+}
+
+type ImageUploadIntTracker struct {
+ Mutex sync.Mutex
+ TuneWS bool
+ RspMap map[int]int
+ WCount int
+ WCap int
+ Off int
+ MaxRxOff int64
}
type ImageUploadResult struct {
@@ -120,7 +141,7 @@
return 0, err
}
- if len(enc) <= s.MtuOut() {
+ if len(enc) <= (s.MtuOut()) {
break
}
@@ -172,8 +193,8 @@
// we can't do much more...
if chunklen <= 0 {
return nil, fmt.Errorf("Cannot create image upload request; "+
- "MTU too low to fit any image data; max-payload-size=%d",
- s.MtuOut())
+ "MTU too low to fit any image data; max-payload-size=%d chunklen %d",
+ s.MtuOut(), chunklen)
}
r := buildImageUploadReq(len(data), hash, upgrade,
@@ -193,34 +214,186 @@
return r, nil
}
-func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
- res := newImageUploadResult()
+func (t *ImageUploadIntTracker) UpdateTracker(off int, status int) {
+ if status == IMAGE_UPLOAD_STATUS_MISSED {
+ /* Upon error, set the value to missed for retransmission */
+ t.RspMap[off] = IMAGE_UPLOAD_CHUNK_MISSED_WM
+ } else if status == IMAGE_UPLOAD_STATUS_EXPECTED {
+ /* When the chunk at a certain offset is transmitted,
+ a response requesting the next offset is expected. This
+ indicates that the chunk is successfully trasmitted. Wait
+ on the chunk in response e.g when offset 0, len 100 is sent,
+ expected offset in the ack is 100 etc. */
+ t.RspMap[off] = 1
+ } else if status == IMAGE_UPLOAD_STATUS_RQ {
+ /* If the chunk at this offset was already transmitted, value
+ goes to zero and that KV pair gets cleaned up subsequently.
+ If there is a repeated request for a certain offset,
+ that offset is not received by the remote side. Decrement
+ the value. Missed chunk processing routine retransmits it */
+ t.RspMap[off] -= 1
+ }
+}
- for off := c.StartOff; off < len(c.Data); {
- r, err := nextImageUploadReq(s, c.Upgrade, c.Data, off, c.ImageNum)
- if err != nil {
- return nil, err
- }
+func (t *ImageUploadIntTracker) CheckWindow() bool {
+ t.Mutex.Lock()
+ defer t.Mutex.Unlock()
- rsp, err := txReq(s, r.Msg(), &c.CmdBase)
- if err != nil {
- return nil, err
+ return t.WCount < t.WCap
+}
+
+func (t *ImageUploadIntTracker) ProcessMissedChunks() {
+ t.Mutex.Lock()
+ defer t.Mutex.Unlock()
+ for o, c := range t.RspMap {
+ if c < IMAGE_UPLOAD_CHUNK_MISSED_WM {
+ delete(t.RspMap, o)
+ t.Off = o
+ log.Debugf("missed? off %d count %d", o, c)
}
+ // clean up done chunks
+ if c == 0 {
+ delete(t.RspMap, o)
+ }
+ }
+}
+
+func (t *ImageUploadIntTracker) HandleResponse(c *ImageUploadCmd, rsp nmp.NmpRsp, res *ImageUploadResult) bool {
+ t.Mutex.Lock()
+ defer t.Mutex.Unlock()
+ wFull := false
+
+ if rsp != nil {
irsp := rsp.(*nmp.ImageUploadRsp)
+ res.Rsps = append(res.Rsps, irsp)
+ t.UpdateTracker(int(irsp.Off), IMAGE_UPLOAD_STATUS_RQ)
- off = int(irsp.Off)
-
+ if t.MaxRxOff < int64(irsp.Off) {
+ t.MaxRxOff = int64(irsp.Off)
+ }
if c.ProgressCb != nil {
c.ProgressCb(c, irsp)
}
-
- res.Rsps = append(res.Rsps, irsp)
- if irsp.Rc != 0 {
- break
- }
}
- return res, nil
+ if t.WCap == t.WCount {
+ wFull = true
+ }
+
+ if t.TuneWS && t.WCap < c.MaxWinSz {
+ t.WCap += 1
+ }
+ t.WCount -= 1
+
+ // Indicate transition from window being full to with open slot(s)
+ if wFull && t.WCap > t.WCount {
+ return true
+ } else {
+ return false
+ }
+}
+
+func (t *ImageUploadIntTracker) HandleError(off int, err error) bool {
+ /*XXX: there could be an Unauthorize or EOF error when the rate is too high
+ due to a large window, we retry. example:
+ "failed to decrypt message: coap_sec_tunnel: decode GCM fail EOF"
+ Since the error is sent with fmt.Errorf() API, with no code,
+ the string may have to be parsed to know the particular error */
+ t.Mutex.Lock()
+ defer t.Mutex.Unlock()
+ log.Debugf("HandleError off %v error %v", off, err)
+ var wFull = false
+ if t.WCap == t.WCount {
+ wFull = true
+ }
+
+ if t.WCount > IMAGE_UPLOAD_START_WS+1 {
+ t.WCap -= 1
+ }
+ t.TuneWS = false
+ t.WCount -= 1
+ t.UpdateTracker(off, IMAGE_UPLOAD_STATUS_MISSED)
+
+ // Indicate transition from window being full to with open slot(s)
+ if wFull && t.WCap > t.WCount {
+ return true
+ } else {
+ return false
+ }
+}
+
+func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
+ res := newImageUploadResult()
+ ch := make(chan int)
+ rspc := make(chan nmp.NmpRsp, c.MaxWinSz)
+ errc := make(chan error, c.MaxWinSz)
+
+ t := ImageUploadIntTracker{
+ TuneWS: true,
+ WCount: 0,
+ WCap: IMAGE_UPLOAD_START_WS,
+ Off: c.StartOff,
+ RspMap: make(map[int]int),
+ MaxRxOff: 0,
+ }
+
+ for int(atomic.LoadInt64(&t.MaxRxOff)) < len(c.Data) {
+ // Block if window is full
+ if !t.CheckWindow() {
+ ch <- 1
+ }
+
+ t.ProcessMissedChunks()
+
+ if t.Off == len(c.Data) {
+ continue
+ }
+
+ t.Mutex.Lock()
+ r, err := nextImageUploadReq(s, c.Upgrade, c.Data, t.Off, c.ImageNum)
+ if err != nil {
+ t.Mutex.Unlock()
+ return nil, err
+ }
+
+ t.Off = (int(r.Off) + len(r.Data))
+
+ // Use up a chunk in window
+ t.WCount += 1
+ err = txReqAsync(s, r.Msg(), &c.CmdBase, rspc, errc)
+ if err != nil {
+ log.Debugf("err txReqAsync %v", err)
+ t.Mutex.Unlock()
+ break
+ }
+ // Mark the expected offset in successful tx of this chunk. i.e off + len
+ t.UpdateTracker(int(r.Off)+len(r.Data), IMAGE_UPLOAD_STATUS_EXPECTED)
+ t.Mutex.Unlock()
+
+ go func(off int) {
+ select {
+ case err := <-errc:
+ sig := t.HandleError(off, err)
+ if sig {
+ <-ch
+ }
+ return
+ case rsp := <-rspc:
+ sig := t.HandleResponse(c, rsp, res)
+ if sig {
+ <-ch
+ }
+ return
+ }
+ }(int(r.Off))
+ }
+
+ if int(t.MaxRxOff) == len(c.Data) {
+ return res, nil
+ } else {
+ return nil, fmt.Errorf("ImageUpload unexpected error after %d/%d bytes",
+ t.MaxRxOff, len(c.Data))
+ }
}
//////////////////////////////////////////////////////////////////////////////
@@ -250,6 +423,7 @@
Upgrade bool
ProgressBar *pb.ProgressBar
ImageNum int
+ MaxWinSz int
}
type ImageUpgradeResult struct {
@@ -320,13 +494,18 @@
}
for {
+ var opt = sesn.TxOptions{
+ Timeout: 3 * time.Second,
+ Tries: 1,
+ }
cmd := NewImageUploadCmd()
cmd.Data = c.Data
cmd.StartOff = startOff
cmd.Upgrade = c.Upgrade
cmd.ProgressCb = progressCb
cmd.ImageNum = c.ImageNum
- cmd.SetTxOptions(c.TxOptions())
+ cmd.SetTxOptions(opt)
+ cmd.MaxWinSz = c.MaxWinSz
res, err := cmd.Run(s)
if err == nil {
diff --git a/nmxact/xact/xact.go b/nmxact/xact/xact.go
index 1f34fcb..8b2ef10 100644
--- a/nmxact/xact/xact.go
+++ b/nmxact/xact/xact.go
@@ -20,6 +20,7 @@
package xact
import (
+ log "github.com/sirupsen/logrus"
"mynewt.apache.org/newtmgr/nmxact/nmp"
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
@@ -45,3 +46,26 @@
return rsp, nil
}
+
+func txReqAsync(s sesn.Sesn, m *nmp.NmpMsg, c *CmdBase, ch chan nmp.NmpRsp, errc chan error) error {
+
+ if c.abortErr != nil {
+ return c.abortErr
+ }
+
+ c.curNmpSeq = m.Hdr.Seq
+ c.curSesn = s
+ defer func() {
+ c.curNmpSeq = 0
+ c.curSesn = nil
+ }()
+
+ err := sesn.TxRxMgmtAsync(s, m, c.TxOptions(), ch, errc)
+ if err != nil {
+ log.Debugf("error %v TxRxMgmtAsync sesn %v seq %d",
+ err, c.curSesn, c.curNmpSeq)
+ return err
+ } else {
+ return nil
+ }
+}