Add seek by time (#180)

Signed-off-by: xiaolong.ran <rxl@apache.org>

### Motivation

- Add seek by time on consumer

### Modifications

- Add seek by time on consumer
- Add test case
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index e832b21..18eced8 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -174,4 +174,14 @@
 	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
 	//       seek() on the individual partitions.
 	Seek(MessageID) error
+
+	// Reset the subscription associated with this consumer to a specific message publish time.
+	//
+	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+	// the individual partitions.
+	//
+	// @param timestamp
+	//            the message publish time where to reposition the subscription
+	//
+	SeekByTime(time time.Time) error
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 095050f..da98b1d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -318,6 +318,14 @@
 	return c.consumers[mid.partitionIdx].Seek(mid)
 }
 
+func (c *consumer) SeekByTime(time time.Time) error {
+	if len(c.consumers) > 1 {
+		return errors.New("for partition topic, seek command should perform on the individual partitions")
+	}
+
+	return c.consumers[0].SeekByTime(time)
+}
+
 var r = &random{
 	R: rand.New(rand.NewSource(time.Now().UnixNano())),
 }
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 6ba3a4a..7e9994d 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -22,6 +22,7 @@
 	"errors"
 	"fmt"
 	"sync"
+	"time"
 
 	pkgerrors "github.com/pkg/errors"
 
@@ -165,3 +166,7 @@
 func (c *multiTopicConsumer) Seek(msgID MessageID) error {
 	return errors.New("seek command not allowed for multi topic consumer")
 }
+
+func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
+	return errors.New("seek command not allowed for multi topic consumer")
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 185f351..c252e74 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -297,6 +297,40 @@
 	}
 }
 
+func (pc *partitionConsumer) SeekByTime(time time.Time) error {
+	req := &seekByTimeRequest{
+		doneCh:      make(chan struct{}),
+		publishTime: time,
+	}
+	pc.eventsCh <- req
+
+	// wait for the request to complete
+	<-req.doneCh
+	return req.err
+}
+
+func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
+	defer close(seek.doneCh)
+
+	if pc.state == consumerClosing || pc.state == consumerClosed {
+		pc.log.Error("Consumer was already closed")
+		return
+	}
+
+	requestID := pc.client.rpcClient.NewRequestID()
+	cmdSeek := &pb.CommandSeek{
+		ConsumerId:         proto.Uint64(pc.consumerID),
+		RequestId:          proto.Uint64(requestID),
+		MessagePublishTime: proto.Uint64(uint64(seek.publishTime.Unix())),
+	}
+
+	_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek)
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to reset to message publish time")
+		seek.err = err
+	}
+}
+
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
 	msgID := req.msgID
 
@@ -557,6 +591,12 @@
 	err    error
 }
 
+type seekByTimeRequest struct {
+	doneCh      chan struct{}
+	publishTime time.Time
+	err         error
+}
+
 func (pc *partitionConsumer) runEventsLoop() {
 	defer func() {
 		pc.log.Debug("exiting events loop")
@@ -577,6 +617,8 @@
 				pc.internalGetLastMessageID(v)
 			case *seekRequest:
 				pc.internalSeek(v)
+			case *seekByTimeRequest:
+				pc.internalSeekByTime(v)
 			case *connectionClosed:
 				pc.reconnectToBroker()
 			case *closeRequest:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 4043380..1518af6 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -215,6 +215,10 @@
 	return errors.New("seek command not allowed for regex consumer")
 }
 
+func (c *regexConsumer) SeekByTime(time time.Time) error {
+	return errors.New("seek command not allowed for regex consumer")
+}
+
 func (c *regexConsumer) closed() bool {
 	select {
 	case <-c.closeCh:
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2ba76e8..d05b8ca 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -25,6 +25,7 @@
 	"testing"
 	"time"
 
+	"github.com/apache/pulsar-client-go/pulsar/internal"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -819,6 +820,61 @@
 	assert.Equal(t, "hello-4", string(msg.Payload()))
 }
 
+func TestConsumerSeekByTime(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	ctx := context.Background()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topicName,
+		DisableBatching: false,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "my-sub",
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	const N = 10
+	resetTimeStr := "100s"
+	retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
+	assert.Nil(t, err)
+
+	for i := 0; i < 10; i++ {
+		_, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.Nil(t, err)
+	}
+
+	for i := 0; i < N; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
+		consumer.Ack(msg)
+	}
+
+	currentTimestamp := time.Now()
+	err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
+	assert.Nil(t, err)
+
+	for i := 0; i < N; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
+		consumer.Ack(msg)
+	}
+}
+
 func TestConsumerMetadata(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go
index f0d5fb3..0763ced 100644
--- a/pulsar/internal/utils.go
+++ b/pulsar/internal/utils.go
@@ -18,8 +18,12 @@
 package internal
 
 import (
+	"strconv"
+	"strings"
 	"sync/atomic"
 	"time"
+
+	"github.com/pkg/errors"
 )
 
 // TimestampMillis return a time unix nano.
@@ -36,3 +40,33 @@
 		}
 	}
 }
+
+func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error) {
+	if relativeTime == "" {
+		return -1, errors.New("time can not be empty")
+	}
+
+	unitTime := relativeTime[len(relativeTime)-1:]
+	t := relativeTime[:len(relativeTime)-1]
+	timeValue, err := strconv.ParseInt(t, 10, 64)
+	if err != nil {
+		return -1, errors.Errorf("invalid time '%s'", t)
+	}
+
+	switch strings.ToLower(unitTime) {
+	case "s":
+		return time.Duration(timeValue) * time.Second, nil
+	case "m":
+		return time.Duration(timeValue) * time.Minute, nil
+	case "h":
+		return time.Duration(timeValue) * time.Hour, nil
+	case "d":
+		return time.Duration(timeValue) * time.Hour * 24, nil
+	case "w":
+		return time.Duration(timeValue) * time.Hour * 24 * 7, nil
+	case "y":
+		return time.Duration(timeValue) * time.Hour * 24 * 7 * 365, nil
+	default:
+		return -1, errors.Errorf("invalid time unit '%s'", unitTime)
+	}
+}
diff --git a/pulsar/internal/utils_test.go b/pulsar/internal/utils_test.go
new file mode 100644
index 0000000..65babd0
--- /dev/null
+++ b/pulsar/internal/utils_test.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestParseRelativeTimeInSeconds(t *testing.T) {
+	testSecondStr := "10s"
+	timestamp, err := ParseRelativeTimeInSeconds(testSecondStr)
+	assert.Nil(t, err)
+	assert.Equal(t, time.Duration(10)*time.Second, timestamp)
+
+	testMinuteStr := "10m"
+	timestamp, err = ParseRelativeTimeInSeconds(testMinuteStr)
+	assert.Nil(t, err)
+	assert.Equal(t, time.Duration(10)*time.Minute, timestamp)
+
+	testHourStr := "10h"
+	timestamp, err = ParseRelativeTimeInSeconds(testHourStr)
+	assert.Nil(t, err)
+	assert.Equal(t, time.Duration(10)*time.Hour, timestamp)
+
+	testDaysStr := "10d"
+	timestamp, err = ParseRelativeTimeInSeconds(testDaysStr)
+	assert.Nil(t, err)
+	assert.Equal(t, time.Duration(10)*time.Hour*24, timestamp)
+
+	testWeekStr := "10w"
+	timestamp, err = ParseRelativeTimeInSeconds(testWeekStr)
+	assert.Nil(t, err)
+	assert.Equal(t, time.Duration(10)*time.Hour*24*7, timestamp)
+
+	testYearStr := "10y"
+	timestamp, err = ParseRelativeTimeInSeconds(testYearStr)
+	assert.Nil(t, err)
+	assert.Equal(t, time.Duration(10)*time.Hour*24*7*365, timestamp)
+}