Make nack tracker tests more robust. (#122)
* Make nack tracker tests more robust.
* Fix time corner cases in tests.
* Fix test race condition.
* Fix comment.
diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
index 733114a..4dac66e 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -18,44 +18,64 @@
package pulsar
import (
- "github.com/stretchr/testify/assert"
"sort"
"sync"
"testing"
"time"
+
+ "github.com/stretchr/testify/assert"
)
+const testNackDelay = 300 * time.Millisecond
+
type nackMockedConsumer struct {
- sync.Mutex
- cond *sync.Cond
+ ch chan messageID
+ closed bool
+ lock sync.Mutex
msgIds []messageID
}
-func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
- nmc.Lock()
- if nmc.msgIds == nil {
- nmc.msgIds = msgIds
- sort.Slice(msgIds, func(i, j int) bool {
- return msgIds[i].ledgerID < msgIds[j].entryID
- })
- nmc.cond.Signal()
+func newNackMockedConsumer() *nackMockedConsumer {
+ t := &nackMockedConsumer{
+ ch: make(chan messageID, 10),
}
-
- nmc.Unlock()
+ go func() {
+ // since the client ticks at an interval of delay / 3
+ // wait another interval to ensure we get all messages
+ time.Sleep(testNackDelay + 101 * time.Millisecond)
+ t.lock.Lock()
+ defer t.lock.Unlock()
+ t.closed = true
+ close(t.ch)
+ }()
+ return t
}
-func (nmc *nackMockedConsumer) Wait() []messageID {
- nmc.Lock()
- defer nmc.Unlock()
- nmc.cond.Wait()
+func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
+ nmc.lock.Lock()
+ defer nmc.lock.Unlock()
+ if nmc.closed {
+ return
+ }
+ for _, id := range msgIds {
+ nmc.ch <- id
+ }
+}
- return nmc.msgIds
+func sortMessageIds(msgIds []messageID) []messageID {
+ sort.Slice(msgIds, func(i, j int) bool {
+ return msgIds[i].ledgerID < msgIds[j].entryID
+ })
+ return msgIds
+}
+
+func (nmc *nackMockedConsumer) Wait() <- chan messageID {
+ return nmc.ch
}
func TestNacksTracker(t *testing.T) {
- nmc := &nackMockedConsumer{}
- nmc.cond = sync.NewCond(nmc)
- nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+ nmc := newNackMockedConsumer()
+ nacks := newNegativeAcksTracker(nmc, testNackDelay)
nacks.Add(&messageID{
ledgerID: 1,
@@ -69,7 +89,11 @@
batchIdx: 1,
})
- msgIds := nmc.Wait()
+ msgIds := make([]messageID, 0)
+ for id := range nmc.Wait() {
+ msgIds = append(msgIds, id)
+ }
+ msgIds = sortMessageIds(msgIds)
assert.Equal(t, 2, len(msgIds))
assert.Equal(t, int64(1), msgIds[0].ledgerID)
@@ -81,9 +105,8 @@
}
func TestNacksWithBatchesTracker(t *testing.T) {
- nmc := &nackMockedConsumer{}
- nmc.cond = sync.NewCond(nmc)
- nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+ nmc := newNackMockedConsumer()
+ nacks := newNegativeAcksTracker(nmc, testNackDelay)
nacks.Add(&messageID{
ledgerID: 1,
@@ -109,7 +132,11 @@
batchIdx: 1,
})
- msgIds := nmc.Wait()
+ msgIds := make([]messageID, 0)
+ for id := range nmc.Wait() {
+ msgIds = append(msgIds, id)
+ }
+ msgIds = sortMessageIds(msgIds)
assert.Equal(t, 2, len(msgIds))
assert.Equal(t, int64(1), msgIds[0].ledgerID)