commit | 0e48f3b53ee21f0b2807662bb7ecdc955aa181ae | [log] [tgz] |
---|---|---|
author | hrsakai <hsakai@yahoo-corp.jp> | Sat Dec 12 17:47:45 2020 +0900 |
committer | GitHub <noreply@github.com> | Sat Dec 12 16:47:45 2020 +0800 |
tree | a1261b642d44b48857a6123d13ef1bc2fe97b4de | |
parent | 0296890a9136a0921da19442cf713a4ebb31f9b0 [diff] |
Fix discard unacked messages (#413) https://github.com/apache/pulsar-client-go/issues/389#issuecomment-741560624 ### Motivation When a consumer whose receive queue is not empty reconnects to a broker, unacked messages don't be redelivered to a client application and these are removed from backlog. This is because StartMessageID is updated and the consumer implicitly acks redelivered unacked messages. https://github.com/apache/pulsar-client-go/blob/v0.3.0/pulsar/consumer_partition.go#L972-L976 https://github.com/apache/pulsar-client-go/blob/v0.3.0/pulsar/consumer_partition.go#L507-L510 ### Modifications * If startMessageID is undefined, it will not be updated in order to prevent unacked messages from being discarded by following logic. .https://github.com/apache/pulsar-client-go/blob/v0.3.0/pulsar/consumer_partition.go#L507-L510
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index d34e0eb..285cf29 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go
@@ -931,6 +931,10 @@ func (pc *partitionConsumer) clearReceiverQueue() trackingMessageID { nextMessageInQueue := pc.clearQueueAndGetNextMessage() + if pc.startMessageID.Undefined() { + return pc.startMessageID + } + if !nextMessageInQueue.Undefined() { return getPreviousMessage(nextMessageInQueue) } else if !pc.lastDequeuedMsg.Undefined() {