[ISSUE-328] gets last message when LatestMessageID and inclusive (#329)

Signed-off-by: Paulo Pereira <paulo.pereira@karhoo.com>

### Motivation

I have a service that when it restarts, it needs to know what was the last message successfully sent to pulsar.
A reader seems the logical place, since we can specify `StartMessageID` as `LatestMessageID()` and `StartMessageIDInclusive`

### Modifications

When the reader is created, verify if it startMessageIDInclusive true and startMessageID == lastestMessageID() and then get the last message id and seek to that message id.
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b3fc17c..04a36cd 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -76,6 +76,8 @@
 		Help:    "Time it takes for application to process messages",
 		Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
 	})
+
+	lastestMessageID = LatestMessageID()
 )
 
 type consumerState int
@@ -98,6 +100,10 @@
 	nonDurable
 )
 
+const (
+	noMessageEntry = -1
+)
+
 type partitionConsumerOpts struct {
 	topic                      string
 	consumerName               string
@@ -193,6 +199,21 @@
 	pc.log.Info("Created consumer")
 	pc.state = consumerReady
 
+	if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
+		msgID, err := pc.requestGetLastMessageID()
+		if err != nil {
+			return nil, err
+		}
+		if msgID.entryID != noMessageEntry {
+			pc.startMessageID = msgID
+
+			err = pc.requestSeek(msgID)
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+
 	go pc.dispatcher()
 
 	go pc.runEventsLoop()
@@ -252,7 +273,10 @@
 
 func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
 	defer close(req.doneCh)
+	req.msgID, req.err = pc.requestGetLastMessageID()
+}
 
+func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
 	requestID := pc.client.rpcClient.NewRequestID()
 	cmdGetLastMessageID := &pb.CommandGetLastMessageId{
 		RequestId:  proto.Uint64(requestID),
@@ -262,11 +286,10 @@
 		pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
 	if err != nil {
 		pc.log.WithError(err).Error("Failed to get last message id")
-		req.err = err
-	} else {
-		id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
-		req.msgID = convertToMessageID(id)
+		return messageID{}, err
 	}
+	id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
+	return convertToMessageID(id), nil
 }
 
 func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
@@ -342,17 +365,20 @@
 
 func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
 	defer close(seek.doneCh)
+	seek.err = pc.requestSeek(seek.msgID)
+}
 
+func (pc *partitionConsumer) requestSeek(msgID messageID) error {
 	if pc.state == consumerClosing || pc.state == consumerClosed {
 		pc.log.Error("Consumer was already closed")
-		return
+		return nil
 	}
 
 	id := &pb.MessageIdData{}
-	err := proto.Unmarshal(seek.msgID.Serialize(), id)
+	err := proto.Unmarshal(msgID.Serialize(), id)
 	if err != nil {
 		pc.log.WithError(err).Errorf("deserialize message id error: %s", err.Error())
-		seek.err = err
+		return err
 	}
 
 	requestID := pc.client.rpcClient.NewRequestID()
@@ -365,8 +391,9 @@
 	_, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek)
 	if err != nil {
 		pc.log.WithError(err).Error("Failed to reset to message id")
-		seek.err = err
+		return err
 	}
+	return nil
 }
 
 func (pc *partitionConsumer) SeekByTime(time time.Time) error {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index d99bfcb..08b949e 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -446,3 +446,66 @@
 		assert.Equal(t, []byte(expectMsg), msg.Payload())
 	}
 }
+
+func TestReaderLatestInclusiveHasNext(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	ctx := context.Background()
+
+	// create reader on the last message (inclusive)
+	reader0, err := client.CreateReader(ReaderOptions{
+		Topic:                   topic,
+		StartMessageID:          LatestMessageID(),
+		StartMessageIDInclusive: true,
+	})
+
+	assert.Nil(t, err)
+	defer reader0.Close()
+
+	assert.False(t, reader0.HasNext())
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	var lastMsgID MessageID
+	for i := 0; i < 10; i++ {
+		lastMsgID, err = producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, lastMsgID)
+	}
+
+	// create reader on the last message (inclusive)
+	reader, err := client.CreateReader(ReaderOptions{
+		Topic:                   topic,
+		StartMessageID:          LatestMessageID(),
+		StartMessageIDInclusive: true,
+	})
+
+	assert.Nil(t, err)
+	defer reader.Close()
+
+	var msgID MessageID
+	if reader.HasNext() {
+		msg, err := reader.Next(context.Background())
+		assert.NoError(t, err)
+
+		assert.Equal(t, []byte("hello-9"), msg.Payload())
+		msgID = msg.ID()
+	}
+
+	assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
+}