Clearing message queues after seek requests (#419)
### Motivation
Message queues should be cleared after seek requests (`Seek` and `SeekByTime`). If this is not performed messages that stay in message queues will be consumed before the sough message.
### Modifications
Cleaning `queueCh` and `messageCh` after successful seek request in partition_consumer.go.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
This change is already covered by existing tests, such as `TestConsumerSeekByTime` and `TestConsumerSeek`.
This change added tests and can be verified as follows:
- Extended `TestConsumerSeekByTime` and `TestConsumerSeek` test for consuming correctly sought message even if some messages stay in message queues.
Signed-off-by: milos-matijasevic <milosmatijasevic2015@gmail.com>
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index d970f65..a787ffb 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -108,11 +108,12 @@
startMessageID trackingMessageID
lastDequeuedMsg trackingMessageID
- eventsCh chan interface{}
- connectedCh chan struct{}
- connectClosedCh chan connectionClosed
- closeCh chan struct{}
- clearQueueCh chan func(id trackingMessageID)
+ eventsCh chan interface{}
+ connectedCh chan struct{}
+ connectClosedCh chan connectionClosed
+ closeCh chan struct{}
+ clearQueueCh chan func(id trackingMessageID)
+ clearMessageQueuesCh chan chan struct{}
nackTracker *negativeAcksTracker
dlq *dlqRouter
@@ -144,6 +145,7 @@
connectClosedCh: make(chan connectionClosed, 10),
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id trackingMessageID)),
+ clearMessageQueuesCh: make(chan chan struct{}),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
metrics: metrics,
@@ -361,6 +363,7 @@
pc.log.WithError(err).Error("Failed to reset to message id")
return err
}
+ pc.clearMessageChannels()
return nil
}
@@ -395,7 +398,15 @@
if err != nil {
pc.log.WithError(err).Error("Failed to reset to message publish time")
seek.err = err
+ return
}
+ pc.clearMessageChannels()
+}
+
+func (pc *partitionConsumer) clearMessageChannels() {
+ doneCh := make(chan struct{})
+ pc.clearMessageQueuesCh <- doneCh
+ <-doneCh
}
func (pc *partitionConsumer) internalAck(req *ackRequest) {
@@ -659,6 +670,27 @@
}
clearQueueCb(nextMessageInQueue)
+
+ case doneCh := <-pc.clearMessageQueuesCh:
+ for len(pc.queueCh) > 0 {
+ <-pc.queueCh
+ }
+ for len(pc.messageCh) > 0 {
+ <-pc.messageCh
+ }
+ messages = nil
+
+ // reset available permits
+ pc.availablePermits = 0
+ initialPermits := uint32(pc.queueSize)
+
+ pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
+ // send initial permits
+ if err := pc.internalFlow(initialPermits); err != nil {
+ pc.log.WithError(err).Error("unable to send initial permits to broker")
+ }
+
+ close(doneCh)
}
}
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 82a1f97..f0b27c6 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -795,20 +795,22 @@
assert.Nil(t, err)
defer consumer.Close()
- const N = 10
+ // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
+ const N = 1100
var seekID MessageID
- for i := 0; i < 10; i++ {
+ for i := 0; i < N; i++ {
id, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
- if i == 4 {
+ if i == N-50 {
seekID = id
}
}
- for i := 0; i < N; i++ {
+ // Don't consume all messages so some stay in queues
+ for i := 0; i < N-20; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
@@ -820,7 +822,7 @@
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
- assert.Equal(t, "hello-4", string(msg.Payload()))
+ assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload()))
}
func TestConsumerSeekByTime(t *testing.T) {
@@ -847,19 +849,21 @@
assert.Nil(t, err)
defer consumer.Close()
- const N = 10
+ // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
+ const N = 1100
resetTimeStr := "100s"
retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
assert.Nil(t, err)
- for i := 0; i < 10; i++ {
+ for i := 0; i < N; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
- for i := 0; i < N; i++ {
+ // Don't consume all messages so some stay in queues
+ for i := 0; i < N-20; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))