[bug] Fix reader latest position (#525)
* Fix reader latest position
Currently, using reader.latest fails because we discard
all messages less than the startPositionID, this causes an issue with
the reader as we filter all messages as all messages are less than
latest message id
* fix lint
* Fix Lint
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 031e0a3..94a914d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -575,6 +575,10 @@
if pc.startMessageID.Undefined() {
return false
}
+ // if we start at latest message, we should never discard
+ if pc.options.startMessageID.equal(latestMessageID) {
+ return false
+ }
if pc.options.startMessageIDInclusive {
return pc.startMessageID.greater(msgID.messageID)
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index c358c22..a84fc2d 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -19,6 +19,7 @@
import (
"fmt"
+ "math"
"math/big"
"strings"
"sync"
@@ -36,6 +37,20 @@
partitionIdx int32
}
+var latestMessageID = messageID{
+ ledgerID: math.MaxInt64,
+ entryID: math.MaxInt64,
+ batchIdx: -1,
+ partitionIdx: -1,
+}
+
+var earliestMessageID = messageID{
+ ledgerID: -1,
+ entryID: -1,
+ batchIdx: -1,
+ partitionIdx: -1,
+}
+
type trackingMessageID struct {
messageID
diff --git a/pulsar/message.go b/pulsar/message.go
index 397c51e..55ecbcc 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -18,7 +18,6 @@
package pulsar
import (
- "math"
"time"
)
@@ -129,10 +128,10 @@
// EarliestMessageID returns a messageID that points to the earliest message available in a topic
func EarliestMessageID() MessageID {
- return newMessageID(-1, -1, -1, -1)
+ return earliestMessageID
}
// LatestMessage returns a messageID that points to the latest message
func LatestMessageID() MessageID {
- return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)
+ return latestMessageID
}