[ISSUE-328] gets last message when LatestMessageID and inclusive (#329)
Signed-off-by: Paulo Pereira <paulo.pereira@karhoo.com>
### Motivation
I have a service that when it restarts, it needs to know what was the last message successfully sent to pulsar.
A reader seems the logical place, since we can specify `StartMessageID` as `LatestMessageID()` and `StartMessageIDInclusive`
### Modifications
When the reader is created, verify if it startMessageIDInclusive true and startMessageID == lastestMessageID() and then get the last message id and seek to that message id.
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b3fc17c..04a36cd 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -76,6 +76,8 @@
Help: "Time it takes for application to process messages",
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
})
+
+ lastestMessageID = LatestMessageID()
)
type consumerState int
@@ -98,6 +100,10 @@
nonDurable
)
+const (
+ noMessageEntry = -1
+)
+
type partitionConsumerOpts struct {
topic string
consumerName string
@@ -193,6 +199,21 @@
pc.log.Info("Created consumer")
pc.state = consumerReady
+ if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
+ msgID, err := pc.requestGetLastMessageID()
+ if err != nil {
+ return nil, err
+ }
+ if msgID.entryID != noMessageEntry {
+ pc.startMessageID = msgID
+
+ err = pc.requestSeek(msgID)
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+
go pc.dispatcher()
go pc.runEventsLoop()
@@ -252,7 +273,10 @@
func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
defer close(req.doneCh)
+ req.msgID, req.err = pc.requestGetLastMessageID()
+}
+func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
requestID := pc.client.rpcClient.NewRequestID()
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
RequestId: proto.Uint64(requestID),
@@ -262,11 +286,10 @@
pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id")
- req.err = err
- } else {
- id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
- req.msgID = convertToMessageID(id)
+ return messageID{}, err
}
+ id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
+ return convertToMessageID(id), nil
}
func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
@@ -342,17 +365,20 @@
func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
+ seek.err = pc.requestSeek(seek.msgID)
+}
+func (pc *partitionConsumer) requestSeek(msgID messageID) error {
if pc.state == consumerClosing || pc.state == consumerClosed {
pc.log.Error("Consumer was already closed")
- return
+ return nil
}
id := &pb.MessageIdData{}
- err := proto.Unmarshal(seek.msgID.Serialize(), id)
+ err := proto.Unmarshal(msgID.Serialize(), id)
if err != nil {
pc.log.WithError(err).Errorf("deserialize message id error: %s", err.Error())
- seek.err = err
+ return err
}
requestID := pc.client.rpcClient.NewRequestID()
@@ -365,8 +391,9 @@
_, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek)
if err != nil {
pc.log.WithError(err).Error("Failed to reset to message id")
- seek.err = err
+ return err
}
+ return nil
}
func (pc *partitionConsumer) SeekByTime(time time.Time) error {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index d99bfcb..08b949e 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -446,3 +446,66 @@
assert.Equal(t, []byte(expectMsg), msg.Payload())
}
}
+
+func TestReaderLatestInclusiveHasNext(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ // create reader on the last message (inclusive)
+ reader0, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: LatestMessageID(),
+ StartMessageIDInclusive: true,
+ })
+
+ assert.Nil(t, err)
+ defer reader0.Close()
+
+ assert.False(t, reader0.HasNext())
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ var lastMsgID MessageID
+ for i := 0; i < 10; i++ {
+ lastMsgID, err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, lastMsgID)
+ }
+
+ // create reader on the last message (inclusive)
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: LatestMessageID(),
+ StartMessageIDInclusive: true,
+ })
+
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ var msgID MessageID
+ if reader.HasNext() {
+ msg, err := reader.Next(context.Background())
+ assert.NoError(t, err)
+
+ assert.Equal(t, []byte("hello-9"), msg.Payload())
+ msgID = msg.ID()
+ }
+
+ assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
+}