Merge pull request #168 from ccollins476ad/freeze

Freeze filter for `image upload` chunk length calculation
diff --git a/newtmgr/bll/bll_sesn.go b/newtmgr/bll/bll_sesn.go
index 91dc81b..d062140 100644
--- a/newtmgr/bll/bll_sesn.go
+++ b/newtmgr/bll/bll_sesn.go
@@ -392,6 +392,27 @@
 	return s.txvr.TxRxMgmt(txRaw, m, s.MtuOut(), timeout)
 }
 
+func (s *BllSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+	timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+	if !s.IsOpen() {
+		return nmxutil.NewSesnClosedError(
+			"Attempt to transmit over closed BLE session")
+	}
+
+	if s.nmpReqChr == nil || s.nmpRspChr == nil {
+		return fmt.Errorf("Cannot send NMP request; peer doesn't " +
+			"support request or response characteristic")
+	}
+
+	txRaw := func(b []byte) error {
+		return s.txWriteCharacteristic(s.nmpReqChr, b, true)
+	}
+
+	s.txvr.TxRxMgmtAsync(txRaw, m, s.MtuOut(), timeout, ch, errc)
+	return nil
+}
+
 func (s *BllSesn) TxCoap(m coap.Message) error {
 	txRaw := func(b []byte) error {
 		return s.txWriteCharacteristic(s.resReqChr, b, !s.cfg.WriteRsp)
diff --git a/newtmgr/cli/image.go b/newtmgr/cli/image.go
index 620cf0d..3e9bf21 100644
--- a/newtmgr/cli/image.go
+++ b/newtmgr/cli/image.go
@@ -45,6 +45,7 @@
 var noerase bool
 var upgrade bool
 var imageNum int
+var maxWinSz int
 
 func imageFlagsStr(image nmp.ImageStateEntry) string {
 	strs := []string{}
@@ -200,9 +201,12 @@
 	c.ProgressBar.SetUnits(pb.U_BYTES)
 	c.ProgressBar.ShowSpeed = true
 	c.LastOff = 0
+	c.MaxWinSz = maxWinSz
 	c.ProgressCb = func(cmd *xact.ImageUploadCmd, rsp *nmp.ImageUploadRsp) {
-		c.ProgressBar.Add(int(rsp.Off - c.LastOff))
-		c.LastOff = rsp.Off
+		if rsp.Off > c.LastOff {
+			c.ProgressBar.Add(int(rsp.Off - c.LastOff))
+			c.LastOff = rsp.Off
+		}
 	}
 
 	res, err := c.Run(s)
@@ -402,7 +406,7 @@
 		Run:     imageUploadCmd,
 	}
 	uploadCmd.PersistentFlags().BoolVarP(&noerase,
-		"noerase", "e", false,
+		"noerase", "e", true,
 		"Don't send specific image erase command to start with")
 	uploadCmd.PersistentFlags().BoolVarP(&upgrade,
 		"upgrade", "u", false,
@@ -411,6 +415,10 @@
 	uploadCmd.PersistentFlags().IntVarP(&imageNum,
 		"image", "n", 0,
 		"In a multi-image system, which image should be uploaded")
+	uploadCmd.PersistentFlags().IntVarP(&maxWinSz,
+		"maxwinsize", "w", xact.IMAGE_UPLOAD_DEF_MAX_WS,
+		"Set the maximum size for the window of outstanding chunks in transit. "+
+			"caution:higher num may not translate to better perf and may result in errors")
 	imageCmd.AddCommand(uploadCmd)
 
 	coreListCmd := &cobra.Command{
diff --git a/nmxact/mgmt/transceiver.go b/nmxact/mgmt/transceiver.go
index 3fb1d5b..74ec51a 100644
--- a/nmxact/mgmt/transceiver.go
+++ b/nmxact/mgmt/transceiver.go
@@ -114,6 +114,54 @@
 	}
 }
 
+func (t *Transceiver) txRxNmpAsync(txCb TxFn, req *nmp.NmpMsg, mtu int,
+	timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+	nl, err := t.nd.AddListener(req.Hdr.Seq)
+	if err != nil {
+		return err
+	}
+
+	b, err := nmp.EncodeNmpPlain(req)
+	if err != nil {
+		return err
+	}
+
+	log.Debugf("Tx NMP async request: seq %d %s", req.Hdr.Seq, hex.Dump(b))
+	if t.isTcp == false && len(b) > mtu {
+		return fmt.Errorf("Request too big")
+	}
+	frags := nmxutil.Fragment(b, mtu)
+	for _, frag := range frags {
+		if err := txCb(frag); err != nil {
+			return err
+		}
+	}
+
+	// Now wait for NMP response.
+	go func() {
+		defer t.nd.RemoveListener(req.Hdr.Seq)
+		for {
+			select {
+			case err := <-nl.ErrChan:
+				errc <- err
+				return
+			case rsp := <-nl.RspChan:
+				ch <- rsp
+				return
+			case _, ok := <-nl.AfterTimeout(timeout):
+				if ok {
+					errc <- nmxutil.NewRspTimeoutError("NMP timeout")
+					return
+				}
+			}
+
+		}
+	}()
+
+	return nil
+}
+
 func (t *Transceiver) txRxOmp(txCb TxFn, req *nmp.NmpMsg, mtu int,
 	timeout time.Duration) (nmp.NmpRsp, error) {
 
@@ -160,6 +208,62 @@
 	}
 }
 
+func (t *Transceiver) txRxOmpAsync(txCb TxFn, req *nmp.NmpMsg, mtu int,
+	timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+	seq := req.Hdr.Seq
+	nl, err := t.od.AddNmpListener(seq)
+	if err != nil {
+		return err
+	}
+
+	var b []byte
+	if t.isTcp {
+		b, err = omp.EncodeOmpTcp(t.txFilterCb, req)
+	} else {
+		b, err = omp.EncodeOmpDgram(t.txFilterCb, req)
+	}
+	if err != nil {
+		return err
+	}
+
+	log.Debugf("Tx OMP request: %v %s", seq, hex.Dump(b))
+
+	if t.isTcp == false && len(b) > mtu {
+		return fmt.Errorf("Request too big")
+	}
+	frags := nmxutil.Fragment(b, mtu)
+	for _, frag := range frags {
+		if err := txCb(frag); err != nil {
+			log.Debugf("txCb error %v", err)
+			t.od.RemoveNmpListener(seq)
+			return err
+		}
+	}
+
+	// Now wait for NMP response.
+	go func() {
+		defer t.od.RemoveNmpListener(seq)
+		for {
+			select {
+			case err := <-nl.ErrChan:
+				log.Debugf("Error reported %v seq %v", err, seq)
+				errc <- err
+				return
+			case rsp := <-nl.RspChan:
+				ch <- rsp
+				return
+			case _, ok := <-nl.AfterTimeout(timeout):
+				if ok {
+					errc <- fmt.Errorf("Request timedout")
+					return
+				}
+			}
+		}
+	}()
+	return nil
+}
+
 func (t *Transceiver) TxRxMgmt(txCb TxFn, req *nmp.NmpMsg, mtu int,
 	timeout time.Duration) (nmp.NmpRsp, error) {
 
@@ -170,6 +274,16 @@
 	}
 }
 
+func (t *Transceiver) TxRxMgmtAsync(txCb TxFn, req *nmp.NmpMsg, mtu int,
+	timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+	if t.nd != nil {
+		return t.txRxNmpAsync(txCb, req, mtu, timeout, ch, errc)
+	} else {
+		return t.txRxOmpAsync(txCb, req, mtu, timeout, ch, errc)
+	}
+}
+
 func (t *Transceiver) TxCoap(txCb TxFn, req coap.Message, mtu int) error {
 	b, err := nmcoap.Encode(req)
 	if err != nil {
diff --git a/nmxact/mtech_lora/mtech_lora_sesn.go b/nmxact/mtech_lora/mtech_lora_sesn.go
index 5dc796a..200207f 100644
--- a/nmxact/mtech_lora/mtech_lora_sesn.go
+++ b/nmxact/mtech_lora/mtech_lora_sesn.go
@@ -316,6 +316,18 @@
 	return s.txvr.TxRxMgmt(txFunc, m, s.MtuOut(), timeout)
 }
 
+func (s *LoraSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+	timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+	rsp, err := s.TxRxMgmt(m, timeout)
+	if err != nil {
+		errc <- err
+	} else {
+		ch <- rsp
+	}
+	return nil
+
+}
+
 func (s *LoraSesn) AbortRx(seq uint8) error {
 	s.txvr.ErrorAll(fmt.Errorf("Rx aborted"))
 	return nil
diff --git a/nmxact/nmble/ble_sesn.go b/nmxact/nmble/ble_sesn.go
index 866f961..a9212ce 100644
--- a/nmxact/nmble/ble_sesn.go
+++ b/nmxact/nmble/ble_sesn.go
@@ -110,6 +110,12 @@
 	return s.Ns.TxRxMgmt(m, timeout)
 }
 
+func (s *BleSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+	timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+	return s.Ns.TxRxMgmtAsync(m, timeout, ch, errc)
+}
+
 func (s *BleSesn) TxCoap(m coap.Message) error {
 	return s.Ns.TxCoap(m)
 }
diff --git a/nmxact/nmble/naked_sesn.go b/nmxact/nmble/naked_sesn.go
index 9129b05..ad6d4ba 100644
--- a/nmxact/nmble/naked_sesn.go
+++ b/nmxact/nmble/naked_sesn.go
@@ -351,6 +351,11 @@
 	return rsp, nil
 }
 
+func (s *NakedSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+	timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+	return fmt.Errorf("unsupported")
+}
+
 func (s *NakedSesn) ListenCoap(
 	mc nmcoap.MsgCriteria) (*nmcoap.Listener, error) {
 
diff --git a/nmxact/nmserial/serial_sesn.go b/nmxact/nmserial/serial_sesn.go
index 46079d8..8bca77c 100644
--- a/nmxact/nmserial/serial_sesn.go
+++ b/nmxact/nmserial/serial_sesn.go
@@ -213,6 +213,18 @@
 	return s.txvr.TxRxMgmt(txFn, m, s.MtuOut(), timeout)
 }
 
+func (s *SerialSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+	timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+
+	rsp, err := s.TxRxMgmt(m, timeout)
+	if err != nil {
+		errc <- err
+	} else {
+		ch <- rsp
+	}
+	return nil
+}
+
 func (s *SerialSesn) TxCoap(m coap.Message) error {
 	if !s.isOpen {
 		return nmxutil.NewSesnClosedError(
diff --git a/nmxact/sesn/sesn.go b/nmxact/sesn/sesn.go
index 1dc0793..f9a40d6 100644
--- a/nmxact/sesn/sesn.go
+++ b/nmxact/sesn/sesn.go
@@ -102,6 +102,7 @@
 	//     * nmxutil.SesnClosedError: session not open.
 	//     * other error
 	TxRxMgmt(m *nmp.NmpMsg, timeout time.Duration) (nmp.NmpRsp, error)
+	TxRxMgmtAsync(m *nmp.NmpMsg, timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error
 
 	// Creates a listener for incoming CoAP messages matching the specified
 	// criteria.
diff --git a/nmxact/sesn/sesn_util.go b/nmxact/sesn/sesn_util.go
index dea3fcd..13486ed 100644
--- a/nmxact/sesn/sesn_util.go
+++ b/nmxact/sesn/sesn_util.go
@@ -45,6 +45,20 @@
 	}
 }
 
+func TxRxMgmtAsync(s Sesn, m *nmp.NmpMsg, o TxOptions, ch chan nmp.NmpRsp, errc chan error) error {
+	retries := o.Tries - 1
+	for i := 0; ; i++ {
+		err := s.TxRxMgmtAsync(m, o.Timeout, ch, errc)
+		if err == nil {
+			return nil
+		}
+
+		if !nmxutil.IsRspTimeout(err) || i >= retries {
+			return err
+		}
+	}
+}
+
 // TxCoap transmits a single CoAP message over the provided session.
 func TxCoap(s Sesn, mp nmcoap.MsgParams) error {
 	msg, err := nmcoap.CreateMsg(s.CoapIsTcp(), mp)
diff --git a/nmxact/udp/udp_sesn.go b/nmxact/udp/udp_sesn.go
index 6986492..45873c6 100644
--- a/nmxact/udp/udp_sesn.go
+++ b/nmxact/udp/udp_sesn.go
@@ -118,6 +118,11 @@
 	return s.txvr.TxRxMgmt(txRaw, m, s.MtuOut(), timeout)
 }
 
+func (s *UdpSesn) TxRxMgmtAsync(m *nmp.NmpMsg,
+	timeout time.Duration, ch chan nmp.NmpRsp, errc chan error) error {
+	return fmt.Errorf("unsupported")
+}
+
 func (s *UdpSesn) AbortRx(seq uint8) error {
 	s.txvr.ErrorAll(fmt.Errorf("Rx aborted"))
 	return nil
diff --git a/nmxact/xact/image.go b/nmxact/xact/image.go
index bebe33d..ea865b6 100644
--- a/nmxact/xact/image.go
+++ b/nmxact/xact/image.go
@@ -25,10 +25,14 @@
 
 	pb "gopkg.in/cheggaaa/pb.v1"
 
+	log "github.com/sirupsen/logrus"
 	"mynewt.apache.org/newtmgr/nmxact/mgmt"
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
 	"mynewt.apache.org/newtmgr/nmxact/nmxutil"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
+	"sync"
+	"sync/atomic"
+	"time"
 )
 
 //////////////////////////////////////////////////////////////////////////////
@@ -36,6 +40,12 @@
 //////////////////////////////////////////////////////////////////////////////
 const IMAGE_UPLOAD_MAX_CHUNK = 512
 const IMAGE_UPLOAD_MIN_1ST_CHUNK = 32
+const IMAGE_UPLOAD_STATUS_MISSED = -1
+const IMAGE_UPLOAD_CHUNK_MISSED_WM = -1
+const IMAGE_UPLOAD_START_WS = 1
+const IMAGE_UPLOAD_DEF_MAX_WS = 5
+const IMAGE_UPLOAD_STATUS_EXPECTED = 0
+const IMAGE_UPLOAD_STATUS_RQ = 1
 
 type ImageUploadProgressFn func(c *ImageUploadCmd, r *nmp.ImageUploadRsp)
 type ImageUploadCmd struct {
@@ -45,6 +55,17 @@
 	Upgrade    bool
 	ProgressCb ImageUploadProgressFn
 	ImageNum   int
+	MaxWinSz   int
+}
+
+type ImageUploadIntTracker struct {
+	Mutex    sync.Mutex
+	TuneWS   bool
+	RspMap   map[int]int
+	WCount   int
+	WCap     int
+	Off      int
+	MaxRxOff int64
 }
 
 type ImageUploadResult struct {
@@ -120,7 +141,7 @@
 			return 0, err
 		}
 
-		if len(enc) <= s.MtuOut() {
+		if len(enc) <= (s.MtuOut()) {
 			break
 		}
 
@@ -172,8 +193,8 @@
 	// we can't do much more...
 	if chunklen <= 0 {
 		return nil, fmt.Errorf("Cannot create image upload request; "+
-			"MTU too low to fit any image data; max-payload-size=%d",
-			s.MtuOut())
+			"MTU too low to fit any image data; max-payload-size=%d chunklen %d",
+			s.MtuOut(), chunklen)
 	}
 
 	r := buildImageUploadReq(len(data), hash, upgrade,
@@ -193,34 +214,186 @@
 	return r, nil
 }
 
-func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
-	res := newImageUploadResult()
+func (t *ImageUploadIntTracker) UpdateTracker(off int, status int) {
+	if status == IMAGE_UPLOAD_STATUS_MISSED {
+		/* Upon error, set the value to missed for retransmission */
+		t.RspMap[off] = IMAGE_UPLOAD_CHUNK_MISSED_WM
+	} else if status == IMAGE_UPLOAD_STATUS_EXPECTED {
+		/* When the chunk at a certain offset is transmitted,
+		   a response requesting the next offset is expected. This
+		   indicates that the chunk is successfully trasmitted. Wait
+		   on the chunk in response e.g when offset 0, len 100 is sent,
+		   expected offset in the ack is 100 etc. */
+		t.RspMap[off] = 1
+	} else if status == IMAGE_UPLOAD_STATUS_RQ {
+		/* If the chunk at this offset was already transmitted, value
+		   goes to zero and that KV pair gets cleaned up subsequently.
+		   If there is a repeated request for a certain offset,
+		   that offset is not received by the remote side. Decrement
+		   the value. Missed chunk processing routine retransmits it */
+		t.RspMap[off] -= 1
+	}
+}
 
-	for off := c.StartOff; off < len(c.Data); {
-		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, off, c.ImageNum)
-		if err != nil {
-			return nil, err
-		}
+func (t *ImageUploadIntTracker) CheckWindow() bool {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
 
-		rsp, err := txReq(s, r.Msg(), &c.CmdBase)
-		if err != nil {
-			return nil, err
+	return t.WCount < t.WCap
+}
+
+func (t *ImageUploadIntTracker) ProcessMissedChunks() {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+	for o, c := range t.RspMap {
+		if c < IMAGE_UPLOAD_CHUNK_MISSED_WM {
+			delete(t.RspMap, o)
+			t.Off = o
+			log.Debugf("missed? off %d count %d", o, c)
 		}
+		// clean up done chunks
+		if c == 0 {
+			delete(t.RspMap, o)
+		}
+	}
+}
+
+func (t *ImageUploadIntTracker) HandleResponse(c *ImageUploadCmd, rsp nmp.NmpRsp, res *ImageUploadResult) bool {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+	wFull := false
+
+	if rsp != nil {
 		irsp := rsp.(*nmp.ImageUploadRsp)
+		res.Rsps = append(res.Rsps, irsp)
+		t.UpdateTracker(int(irsp.Off), IMAGE_UPLOAD_STATUS_RQ)
 
-		off = int(irsp.Off)
-
+		if t.MaxRxOff < int64(irsp.Off) {
+			t.MaxRxOff = int64(irsp.Off)
+		}
 		if c.ProgressCb != nil {
 			c.ProgressCb(c, irsp)
 		}
-
-		res.Rsps = append(res.Rsps, irsp)
-		if irsp.Rc != 0 {
-			break
-		}
 	}
 
-	return res, nil
+	if t.WCap == t.WCount {
+		wFull = true
+	}
+
+	if t.TuneWS && t.WCap < c.MaxWinSz {
+		t.WCap += 1
+	}
+	t.WCount -= 1
+
+	// Indicate transition from window being full to with open slot(s)
+	if wFull && t.WCap > t.WCount {
+		return true
+	} else {
+		return false
+	}
+}
+
+func (t *ImageUploadIntTracker) HandleError(off int, err error) bool {
+	/*XXX: there could be an Unauthorize or EOF error  when the rate is too high
+	  due to a large window, we retry. example:
+	  "failed to decrypt message: coap_sec_tunnel: decode GCM fail EOF"
+	  Since the error is sent with fmt.Errorf() API, with no code,
+	  the string may have to be parsed to know the particular error */
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+	log.Debugf("HandleError off %v error %v", off, err)
+	var wFull = false
+	if t.WCap == t.WCount {
+		wFull = true
+	}
+
+	if t.WCount > IMAGE_UPLOAD_START_WS+1 {
+		t.WCap -= 1
+	}
+	t.TuneWS = false
+	t.WCount -= 1
+	t.UpdateTracker(off, IMAGE_UPLOAD_STATUS_MISSED)
+
+	// Indicate transition from window being full to with open slot(s)
+	if wFull && t.WCap > t.WCount {
+		return true
+	} else {
+		return false
+	}
+}
+
+func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
+	res := newImageUploadResult()
+	ch := make(chan int)
+	rspc := make(chan nmp.NmpRsp, c.MaxWinSz)
+	errc := make(chan error, c.MaxWinSz)
+
+	t := ImageUploadIntTracker{
+		TuneWS:   true,
+		WCount:   0,
+		WCap:     IMAGE_UPLOAD_START_WS,
+		Off:      c.StartOff,
+		RspMap:   make(map[int]int),
+		MaxRxOff: 0,
+	}
+
+	for int(atomic.LoadInt64(&t.MaxRxOff)) < len(c.Data) {
+		// Block if window is full
+		if !t.CheckWindow() {
+			ch <- 1
+		}
+
+		t.ProcessMissedChunks()
+
+		if t.Off == len(c.Data) {
+			continue
+		}
+
+		t.Mutex.Lock()
+		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, t.Off, c.ImageNum)
+		if err != nil {
+			t.Mutex.Unlock()
+			return nil, err
+		}
+
+		t.Off = (int(r.Off) + len(r.Data))
+
+		// Use up a chunk in window
+		t.WCount += 1
+		err = txReqAsync(s, r.Msg(), &c.CmdBase, rspc, errc)
+		if err != nil {
+			log.Debugf("err txReqAsync %v", err)
+			t.Mutex.Unlock()
+			break
+		}
+		// Mark the expected offset in successful tx of this chunk. i.e off + len
+		t.UpdateTracker(int(r.Off)+len(r.Data), IMAGE_UPLOAD_STATUS_EXPECTED)
+		t.Mutex.Unlock()
+
+		go func(off int) {
+			select {
+			case err := <-errc:
+				sig := t.HandleError(off, err)
+				if sig {
+					<-ch
+				}
+				return
+			case rsp := <-rspc:
+				sig := t.HandleResponse(c, rsp, res)
+				if sig {
+					<-ch
+				}
+				return
+			}
+		}(int(r.Off))
+	}
+
+	if int(t.MaxRxOff) == len(c.Data) {
+		return res, nil
+	} else {
+		return nil, fmt.Errorf("ImageUpload unexpected error after %d/%d bytes",
+			t.MaxRxOff, len(c.Data))
+	}
 }
 
 //////////////////////////////////////////////////////////////////////////////
@@ -250,6 +423,7 @@
 	Upgrade     bool
 	ProgressBar *pb.ProgressBar
 	ImageNum    int
+	MaxWinSz    int
 }
 
 type ImageUpgradeResult struct {
@@ -320,13 +494,18 @@
 	}
 
 	for {
+		var opt = sesn.TxOptions{
+			Timeout: 3 * time.Second,
+			Tries:   1,
+		}
 		cmd := NewImageUploadCmd()
 		cmd.Data = c.Data
 		cmd.StartOff = startOff
 		cmd.Upgrade = c.Upgrade
 		cmd.ProgressCb = progressCb
 		cmd.ImageNum = c.ImageNum
-		cmd.SetTxOptions(c.TxOptions())
+		cmd.SetTxOptions(opt)
+		cmd.MaxWinSz = c.MaxWinSz
 
 		res, err := cmd.Run(s)
 		if err == nil {
diff --git a/nmxact/xact/xact.go b/nmxact/xact/xact.go
index 1f34fcb..8b2ef10 100644
--- a/nmxact/xact/xact.go
+++ b/nmxact/xact/xact.go
@@ -20,6 +20,7 @@
 package xact
 
 import (
+	log "github.com/sirupsen/logrus"
 	"mynewt.apache.org/newtmgr/nmxact/nmp"
 	"mynewt.apache.org/newtmgr/nmxact/sesn"
 )
@@ -45,3 +46,26 @@
 
 	return rsp, nil
 }
+
+func txReqAsync(s sesn.Sesn, m *nmp.NmpMsg, c *CmdBase, ch chan nmp.NmpRsp, errc chan error) error {
+
+	if c.abortErr != nil {
+		return c.abortErr
+	}
+
+	c.curNmpSeq = m.Hdr.Seq
+	c.curSesn = s
+	defer func() {
+		c.curNmpSeq = 0
+		c.curSesn = nil
+	}()
+
+	err := sesn.TxRxMgmtAsync(s, m, c.TxOptions(), ch, errc)
+	if err != nil {
+		log.Debugf("error %v TxRxMgmtAsync sesn %v seq %d",
+			err, c.curSesn, c.curNmpSeq)
+		return err
+	} else {
+		return nil
+	}
+}