Add seek logic for reader (#356)

Signed-off-by: xiaolong.ran <rxl@apache.org>

### Motivation


Follow https://github.com/apache/pulsar-client-go/pull/222 and add the seek logic for reader 

### Modifications

- Add `seek by msgID` interface
- Add `seek by time` interface
- Add test case

### Verifying this change

- [x] Make sure that the change passes the CI checks.
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 04a36cd..4d10521 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -207,7 +207,7 @@
 		if msgID.entryID != noMessageEntry {
 			pc.startMessageID = msgID
 
-			err = pc.requestSeek(msgID)
+			err = pc.requestSeek(msgID.messageID)
 			if err != nil {
 				return nil, err
 			}
@@ -276,7 +276,7 @@
 	req.msgID, req.err = pc.requestGetLastMessageID()
 }
 
-func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
+func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error) {
 	requestID := pc.client.rpcClient.NewRequestID()
 	cmdGetLastMessageID := &pb.CommandGetLastMessageId{
 		RequestId:  proto.Uint64(requestID),
@@ -286,7 +286,7 @@
 		pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
 	if err != nil {
 		pc.log.WithError(err).Error("Failed to get last message id")
-		return messageID{}, err
+		return trackingMessageID{}, err
 	}
 	id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
 	return convertToMessageID(id), nil
@@ -365,7 +365,7 @@
 
 func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
 	defer close(seek.doneCh)
-	seek.err = pc.requestSeek(seek.msgID)
+	seek.err = pc.requestSeek(seek.msgID.messageID)
 }
 
 func (pc *partitionConsumer) requestSeek(msgID messageID) error {
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 8de1ad5..1bfec52 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -665,8 +665,8 @@
 }
 
 func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
-	c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
 	consumerID := closeConsumer.GetConsumerId()
+	c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
 
 	c.Lock()
 	defer c.Unlock()
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 8fe99f8..40234aa 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -17,7 +17,10 @@
 
 package pulsar
 
-import "context"
+import (
+	"context"
+	"time"
+)
 
 // ReaderMessage package Reader and Message as a struct to use
 type ReaderMessage struct {
@@ -88,4 +91,21 @@
 
 	// Close the reader and stop the broker to push more messages
 	Close()
+
+	// Reset the subscription associated with this reader to a specific message id.
+	// The message id can either be a specific message or represent the first or last messages in the topic.
+	//
+	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
+	//       seek() on the individual partitions.
+	Seek(MessageID) error
+
+	// Reset the subscription associated with this reader to a specific message publish time.
+	//
+	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+	// the individual partitions.
+	//
+	// @param timestamp
+	//            the message publish time where to reposition the subscription
+	//
+	SeekByTime(time time.Time) error
 }
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 474d0db..8083b06 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -20,6 +20,7 @@
 import (
 	"context"
 	"fmt"
+	"sync"
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
@@ -45,6 +46,7 @@
 )
 
 type reader struct {
+	sync.Mutex
 	pc                  *partitionConsumer
 	messageCh           chan ConsumerMessage
 	lastMessageInBroker trackingMessageID
@@ -187,3 +189,39 @@
 	r.pc.Close()
 	readersClosed.Inc()
 }
+
+func (r *reader) messageID(msgID MessageID) (trackingMessageID, bool) {
+	mid, ok := toTrackingMessageID(msgID)
+	if !ok {
+		r.log.Warnf("invalid message id type %T", msgID)
+		return trackingMessageID{}, false
+	}
+
+	partition := int(mid.partitionIdx)
+	// did we receive a valid partition index?
+	if partition < 0 {
+		r.log.Warnf("invalid partition index %d expected", partition)
+		return trackingMessageID{}, false
+	}
+
+	return mid, true
+}
+
+func (r *reader) Seek(msgID MessageID) error {
+	r.Lock()
+	defer r.Unlock()
+
+	mid, ok := r.messageID(msgID)
+	if !ok {
+		return nil
+	}
+
+	return r.pc.Seek(mid)
+}
+
+func (r *reader) SeekByTime(time time.Time) error {
+	r.Lock()
+	defer r.Unlock()
+
+	return r.pc.SeekByTime(time)
+}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 08b949e..793dc8d 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -447,6 +447,65 @@
 	}
 }
 
+func TestReaderSeek(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	ctx := context.Background()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	reader, err := client.CreateReader(ReaderOptions{
+		Topic:          topicName,
+		StartMessageID: EarliestMessageID(),
+	})
+	assert.Nil(t, err)
+	defer reader.Close()
+
+	const N = 10
+	var seekID MessageID
+	for i := 0; i < N; i++ {
+		id, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.Nil(t, err)
+
+		if i == 4 {
+			seekID = id
+		}
+	}
+	err = producer.Flush()
+	assert.NoError(t, err)
+
+	for i := 0; i < N; i++ {
+		msg, err := reader.Next(ctx)
+		assert.Nil(t, err)
+		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
+	}
+
+	err = reader.Seek(seekID)
+	assert.Nil(t, err)
+
+	readerOfSeek, err := client.CreateReader(ReaderOptions{
+		Topic:                   topicName,
+		StartMessageID:          seekID,
+		StartMessageIDInclusive: true,
+	})
+	assert.Nil(t, err)
+
+	msg, err := readerOfSeek.Next(ctx)
+	assert.Nil(t, err)
+	assert.Equal(t, "hello-4", string(msg.Payload()))
+}
+
 func TestReaderLatestInclusiveHasNext(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
@@ -498,14 +557,10 @@
 	assert.Nil(t, err)
 	defer reader.Close()
 
-	var msgID MessageID
 	if reader.HasNext() {
 		msg, err := reader.Next(context.Background())
 		assert.NoError(t, err)
 
 		assert.Equal(t, []byte("hello-9"), msg.Payload())
-		msgID = msg.ID()
 	}
-
-	assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
 }