blob: 0a794f64033e0e541010c149c024d3cbdd734d36 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"fmt"
"sort"
"sync"
"sync/atomic"
"testing"
"time"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/stretchr/testify/assert"
)
func TestNoCacheTracker(t *testing.T) {
tests := []AckGroupingOptions{
{
MaxSize: 0,
MaxTime: 10 * time.Hour,
},
{
MaxSize: 1,
MaxTime: 10 * time.Hour,
},
}
for _, option := range tests {
t.Run(fmt.Sprintf("TestAckImmediately_size_%v_time_%vs", option.MaxSize, option.MaxTime.Seconds()),
func(t *testing.T) {
ledgerID0 := int64(-1)
ledgerID1 := int64(-1)
tracker := newAckGroupingTracker(&option,
func(id MessageID) { ledgerID0 = id.LedgerID() },
func(id MessageID) { ledgerID1 = id.LedgerID() },
nil)
tracker.add(&messageID{ledgerID: 1})
assert.Equal(t, atomic.LoadInt64(&ledgerID0), int64(1))
tracker.addCumulative(&messageID{ledgerID: 2})
assert.Equal(t, atomic.LoadInt64(&ledgerID1), int64(2))
})
}
}
type mockAcker struct {
sync.Mutex
ledgerIDs []int64
cumulativeLedgerID int64
}
func (a *mockAcker) ack(ids []*pb.MessageIdData) {
defer a.Unlock()
a.Lock()
for _, id := range ids {
a.ledgerIDs = append(a.ledgerIDs, int64(*id.LedgerId))
}
}
func (a *mockAcker) ackCumulative(id MessageID) {
atomic.StoreInt64(&a.cumulativeLedgerID, id.LedgerID())
}
func (a *mockAcker) getLedgerIDs() []int64 {
defer a.Unlock()
a.Lock()
sort.Slice(a.ledgerIDs, func(i, j int) bool { return a.ledgerIDs[i] < a.ledgerIDs[j] })
return a.ledgerIDs
}
func (a *mockAcker) getCumulativeLedgerID() int64 {
return atomic.LoadInt64(&a.cumulativeLedgerID)
}
func (a *mockAcker) reset() {
a.ledgerIDs = make([]int64, 0)
a.cumulativeLedgerID = int64(0)
}
func TestCachedTracker(t *testing.T) {
var acker mockAcker
tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, nil,
func(id MessageID) { acker.ackCumulative(id) }, func(ids []*pb.MessageIdData) { acker.ack(ids) })
tracker.add(&messageID{ledgerID: 1})
tracker.add(&messageID{ledgerID: 2})
for i := 1; i <= 2; i++ {
assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(i)}))
}
assert.Equal(t, 0, len(acker.getLedgerIDs()))
tracker.add(&messageID{ledgerID: 3})
assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 0 },
10*time.Millisecond, 2*time.Millisecond)
assert.Equal(t, []int64{1, 2, 3}, acker.getLedgerIDs())
for i := 1; i <= 3; i++ {
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: int64(i)}))
}
tracker.add(&messageID{ledgerID: 4})
// 4 won't be added because the cache is not full
assert.Equal(t, []int64{1, 2, 3}, acker.getLedgerIDs())
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 5}))
tracker.addCumulative(&messageID{ledgerID: 5})
for i := 0; i <= 5; i++ {
assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(i)}))
}
assert.Equal(t, int64(5), acker.getCumulativeLedgerID())
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: int64(6)}))
tracker.flush()
assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 3 },
10*time.Millisecond, 2*time.Millisecond)
assert.Equal(t, []int64{1, 2, 3, 4}, acker.getLedgerIDs())
}
func TestTimedTrackerIndividualAck(t *testing.T) {
var acker mockAcker
// MaxSize: 1000, MaxTime: 100ms
tracker := newAckGroupingTracker(nil, nil,
func(id MessageID) { acker.ackCumulative(id) }, func(ids []*pb.MessageIdData) { acker.ack(ids) })
expected := make([]int64, 0)
for i := 0; i < 999; i++ {
tracker.add(&messageID{ledgerID: int64(i)})
expected = append(expected, int64(i))
}
assert.Equal(t, 0, len(acker.getLedgerIDs()))
// case 1: flush because the tracker timed out
assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) == 999 },
150*time.Millisecond, 10*time.Millisecond)
assert.Equal(t, expected, acker.getLedgerIDs())
// case 2: flush because cache is full
time.Sleep(50) // see case 3
acker.reset()
expected = append(expected, 999)
for i := 0; i < 1001; i++ {
tracker.add(&messageID{ledgerID: int64(i)})
}
assert.Equal(t, expected, acker.getLedgerIDs())
// case 3: flush will reset the timer
start := time.Now()
assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 1000 },
150*time.Millisecond, 10*time.Millisecond)
elapsed := time.Since(start)
assert.GreaterOrEqual(t, elapsed, int64(100), "elapsed", elapsed)
assert.Equal(t, append(expected, 1000), acker.getLedgerIDs())
}
func TestTimedTrackerCumulativeAck(t *testing.T) {
var acker mockAcker
// MaxTime is 100ms
tracker := newAckGroupingTracker(nil, nil, func(id MessageID) { acker.ackCumulative(id) }, nil)
// case 1: flush because of the timeout
tracker.addCumulative(&messageID{ledgerID: 1})
assert.NotEqual(t, int64(1), acker.getCumulativeLedgerID())
assert.Eventually(t, func() bool { return acker.getCumulativeLedgerID() == int64(1) },
150*time.Millisecond, 10*time.Millisecond)
assert.Equal(t, int64(1), acker.getCumulativeLedgerID())
// case 2: flush manually
tracker.addCumulative(&messageID{ledgerID: 2})
tracker.flush()
assert.Equal(t, int64(2), acker.getCumulativeLedgerID())
// case 3: older MessageID cannot be acknowledged
tracker.addCumulative(&messageID{ledgerID: 1})
tracker.flush()
assert.Equal(t, int64(2), acker.getCumulativeLedgerID())
}
func TestTimedTrackerIsDuplicate(t *testing.T) {
tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id MessageID) {},
func(id []*pb.MessageIdData) {})
tracker.add(&messageID{batchIdx: 0, batchSize: 3})
tracker.add(&messageID{batchIdx: 2, batchSize: 3})
assert.True(t, tracker.isDuplicate(&messageID{batchIdx: 0, batchSize: 3}))
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 1, batchSize: 3}))
assert.True(t, tracker.isDuplicate(&messageID{batchIdx: 2, batchSize: 3}))
tracker.flush()
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 0, batchSize: 3}))
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 1, batchSize: 3}))
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 2, batchSize: 3}))
}
func TestDuplicateAfterClose(t *testing.T) {
var acker mockAcker
tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, nil,
func(id MessageID) { acker.ackCumulative(id) }, func(ids []*pb.MessageIdData) { acker.ack(ids) })
tracker.add(&messageID{ledgerID: 1})
assert.True(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
tracker.close()
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
}
func TestTrackerPendingAcks(t *testing.T) {
m := make(map[uint64][]int64)
tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, nil, nil,
func(ids []*pb.MessageIdData) {
for _, id := range ids {
m[*id.LedgerId] = id.AckSet
}
})
tracker.add(&messageID{ledgerID: 0, batchIdx: 0, batchSize: 30})
for i := 0; i < 10; i++ {
tracker.add(&messageID{ledgerID: 1, batchIdx: int32(i), batchSize: 10})
}
assert.Equal(t, 0, len(m)) // the number of entries is 2, so it's not flushed
tracker.flush()
assert.Equal(t, 2, len(m))
ackSet, found := m[0]
assert.True(t, found)
assert.Greater(t, len(ackSet), 0)
ackSet, found = m[1]
assert.True(t, found)
assert.Equal(t, 0, len(ackSet)) // all messages in the batch are acknowledged
}