Clearing message queues after seek requests (#419)

### Motivation

Message queues should be cleared after seek requests (`Seek` and `SeekByTime`). If this is not performed messages that stay in message queues will be consumed before the sough message.

### Modifications

Cleaning `queueCh` and `messageCh` after successful seek request in partition_consumer.go.

### Verifying this change

- [x] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as `TestConsumerSeekByTime` and `TestConsumerSeek`.

This change added tests and can be verified as follows:

  - Extended `TestConsumerSeekByTime` and `TestConsumerSeek` test for consuming correctly sought message even if some messages stay in message queues.


Signed-off-by: milos-matijasevic <milosmatijasevic2015@gmail.com>
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index d970f65..a787ffb 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -108,11 +108,12 @@
 	startMessageID  trackingMessageID
 	lastDequeuedMsg trackingMessageID
 
-	eventsCh        chan interface{}
-	connectedCh     chan struct{}
-	connectClosedCh chan connectionClosed
-	closeCh         chan struct{}
-	clearQueueCh    chan func(id trackingMessageID)
+	eventsCh             chan interface{}
+	connectedCh          chan struct{}
+	connectClosedCh      chan connectionClosed
+	closeCh              chan struct{}
+	clearQueueCh         chan func(id trackingMessageID)
+	clearMessageQueuesCh chan chan struct{}
 
 	nackTracker *negativeAcksTracker
 	dlq         *dlqRouter
@@ -144,6 +145,7 @@
 		connectClosedCh:      make(chan connectionClosed, 10),
 		closeCh:              make(chan struct{}),
 		clearQueueCh:         make(chan func(id trackingMessageID)),
+		clearMessageQueuesCh: make(chan chan struct{}),
 		compressionProviders: make(map[pb.CompressionType]compression.Provider),
 		dlq:                  dlq,
 		metrics:              metrics,
@@ -361,6 +363,7 @@
 		pc.log.WithError(err).Error("Failed to reset to message id")
 		return err
 	}
+	pc.clearMessageChannels()
 	return nil
 }
 
@@ -395,7 +398,15 @@
 	if err != nil {
 		pc.log.WithError(err).Error("Failed to reset to message publish time")
 		seek.err = err
+		return
 	}
+	pc.clearMessageChannels()
+}
+
+func (pc *partitionConsumer) clearMessageChannels() {
+	doneCh := make(chan struct{})
+	pc.clearMessageQueuesCh <- doneCh
+	<-doneCh
 }
 
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
@@ -659,6 +670,27 @@
 			}
 
 			clearQueueCb(nextMessageInQueue)
+
+		case doneCh := <-pc.clearMessageQueuesCh:
+			for len(pc.queueCh) > 0 {
+				<-pc.queueCh
+			}
+			for len(pc.messageCh) > 0 {
+				<-pc.messageCh
+			}
+			messages = nil
+
+			// reset available permits
+			pc.availablePermits = 0
+			initialPermits := uint32(pc.queueSize)
+
+			pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
+			// send initial permits
+			if err := pc.internalFlow(initialPermits); err != nil {
+				pc.log.WithError(err).Error("unable to send initial permits to broker")
+			}
+
+			close(doneCh)
 		}
 	}
 }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 82a1f97..f0b27c6 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -795,20 +795,22 @@
 	assert.Nil(t, err)
 	defer consumer.Close()
 
-	const N = 10
+	// Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
+	const N = 1100
 	var seekID MessageID
-	for i := 0; i < 10; i++ {
+	for i := 0; i < N; i++ {
 		id, err := producer.Send(ctx, &ProducerMessage{
 			Payload: []byte(fmt.Sprintf("hello-%d", i)),
 		})
 		assert.Nil(t, err)
 
-		if i == 4 {
+		if i == N-50 {
 			seekID = id
 		}
 	}
 
-	for i := 0; i < N; i++ {
+	// Don't consume all messages so some stay in queues
+	for i := 0; i < N-20; i++ {
 		msg, err := consumer.Receive(ctx)
 		assert.Nil(t, err)
 		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
@@ -820,7 +822,7 @@
 
 	msg, err := consumer.Receive(ctx)
 	assert.Nil(t, err)
-	assert.Equal(t, "hello-4", string(msg.Payload()))
+	assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload()))
 }
 
 func TestConsumerSeekByTime(t *testing.T) {
@@ -847,19 +849,21 @@
 	assert.Nil(t, err)
 	defer consumer.Close()
 
-	const N = 10
+	// Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
+	const N = 1100
 	resetTimeStr := "100s"
 	retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
 	assert.Nil(t, err)
 
-	for i := 0; i < 10; i++ {
+	for i := 0; i < N; i++ {
 		_, err := producer.Send(ctx, &ProducerMessage{
 			Payload: []byte(fmt.Sprintf("hello-%d", i)),
 		})
 		assert.Nil(t, err)
 	}
 
-	for i := 0; i < N; i++ {
+	// Don't consume all messages so some stay in queues
+	for i := 0; i < N-20; i++ {
 		msg, err := consumer.Receive(ctx)
 		assert.Nil(t, err)
 		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))