Add seek by time (#180)
Signed-off-by: xiaolong.ran <rxl@apache.org>
### Motivation
- Add seek by time on consumer
### Modifications
- Add seek by time on consumer
- Add test case
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index e832b21..18eced8 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -174,4 +174,14 @@
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(MessageID) error
+
+ // Reset the subscription associated with this consumer to a specific message publish time.
+ //
+ // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+ // the individual partitions.
+ //
+ // @param timestamp
+ // the message publish time where to reposition the subscription
+ //
+ SeekByTime(time time.Time) error
}
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 095050f..da98b1d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -318,6 +318,14 @@
return c.consumers[mid.partitionIdx].Seek(mid)
}
+func (c *consumer) SeekByTime(time time.Time) error {
+ if len(c.consumers) > 1 {
+ return errors.New("for partition topic, seek command should perform on the individual partitions")
+ }
+
+ return c.consumers[0].SeekByTime(time)
+}
+
var r = &random{
R: rand.New(rand.NewSource(time.Now().UnixNano())),
}
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 6ba3a4a..7e9994d 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -22,6 +22,7 @@
"errors"
"fmt"
"sync"
+ "time"
pkgerrors "github.com/pkg/errors"
@@ -165,3 +166,7 @@
func (c *multiTopicConsumer) Seek(msgID MessageID) error {
return errors.New("seek command not allowed for multi topic consumer")
}
+
+func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
+ return errors.New("seek command not allowed for multi topic consumer")
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 185f351..c252e74 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -297,6 +297,40 @@
}
}
+func (pc *partitionConsumer) SeekByTime(time time.Time) error {
+ req := &seekByTimeRequest{
+ doneCh: make(chan struct{}),
+ publishTime: time,
+ }
+ pc.eventsCh <- req
+
+ // wait for the request to complete
+ <-req.doneCh
+ return req.err
+}
+
+func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
+ defer close(seek.doneCh)
+
+ if pc.state == consumerClosing || pc.state == consumerClosed {
+ pc.log.Error("Consumer was already closed")
+ return
+ }
+
+ requestID := pc.client.rpcClient.NewRequestID()
+ cmdSeek := &pb.CommandSeek{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ RequestId: proto.Uint64(requestID),
+ MessagePublishTime: proto.Uint64(uint64(seek.publishTime.Unix())),
+ }
+
+ _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to reset to message publish time")
+ seek.err = err
+ }
+}
+
func (pc *partitionConsumer) internalAck(req *ackRequest) {
msgID := req.msgID
@@ -557,6 +591,12 @@
err error
}
+type seekByTimeRequest struct {
+ doneCh chan struct{}
+ publishTime time.Time
+ err error
+}
+
func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
@@ -577,6 +617,8 @@
pc.internalGetLastMessageID(v)
case *seekRequest:
pc.internalSeek(v)
+ case *seekByTimeRequest:
+ pc.internalSeekByTime(v)
case *connectionClosed:
pc.reconnectToBroker()
case *closeRequest:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 4043380..1518af6 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -215,6 +215,10 @@
return errors.New("seek command not allowed for regex consumer")
}
+func (c *regexConsumer) SeekByTime(time time.Time) error {
+ return errors.New("seek command not allowed for regex consumer")
+}
+
func (c *regexConsumer) closed() bool {
select {
case <-c.closeCh:
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2ba76e8..d05b8ca 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -25,6 +25,7 @@
"testing"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/stretchr/testify/assert"
)
@@ -819,6 +820,61 @@
assert.Equal(t, "hello-4", string(msg.Payload()))
}
+func TestConsumerSeekByTime(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "my-sub",
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ const N = 10
+ resetTimeStr := "100s"
+ retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
+ assert.Nil(t, err)
+
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
+ consumer.Ack(msg)
+ }
+
+ currentTimestamp := time.Now()
+ err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
+ assert.Nil(t, err)
+
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
+ consumer.Ack(msg)
+ }
+}
+
func TestConsumerMetadata(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go
index f0d5fb3..0763ced 100644
--- a/pulsar/internal/utils.go
+++ b/pulsar/internal/utils.go
@@ -18,8 +18,12 @@
package internal
import (
+ "strconv"
+ "strings"
"sync/atomic"
"time"
+
+ "github.com/pkg/errors"
)
// TimestampMillis return a time unix nano.
@@ -36,3 +40,33 @@
}
}
}
+
+func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error) {
+ if relativeTime == "" {
+ return -1, errors.New("time can not be empty")
+ }
+
+ unitTime := relativeTime[len(relativeTime)-1:]
+ t := relativeTime[:len(relativeTime)-1]
+ timeValue, err := strconv.ParseInt(t, 10, 64)
+ if err != nil {
+ return -1, errors.Errorf("invalid time '%s'", t)
+ }
+
+ switch strings.ToLower(unitTime) {
+ case "s":
+ return time.Duration(timeValue) * time.Second, nil
+ case "m":
+ return time.Duration(timeValue) * time.Minute, nil
+ case "h":
+ return time.Duration(timeValue) * time.Hour, nil
+ case "d":
+ return time.Duration(timeValue) * time.Hour * 24, nil
+ case "w":
+ return time.Duration(timeValue) * time.Hour * 24 * 7, nil
+ case "y":
+ return time.Duration(timeValue) * time.Hour * 24 * 7 * 365, nil
+ default:
+ return -1, errors.Errorf("invalid time unit '%s'", unitTime)
+ }
+}
diff --git a/pulsar/internal/utils_test.go b/pulsar/internal/utils_test.go
new file mode 100644
index 0000000..65babd0
--- /dev/null
+++ b/pulsar/internal/utils_test.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestParseRelativeTimeInSeconds(t *testing.T) {
+ testSecondStr := "10s"
+ timestamp, err := ParseRelativeTimeInSeconds(testSecondStr)
+ assert.Nil(t, err)
+ assert.Equal(t, time.Duration(10)*time.Second, timestamp)
+
+ testMinuteStr := "10m"
+ timestamp, err = ParseRelativeTimeInSeconds(testMinuteStr)
+ assert.Nil(t, err)
+ assert.Equal(t, time.Duration(10)*time.Minute, timestamp)
+
+ testHourStr := "10h"
+ timestamp, err = ParseRelativeTimeInSeconds(testHourStr)
+ assert.Nil(t, err)
+ assert.Equal(t, time.Duration(10)*time.Hour, timestamp)
+
+ testDaysStr := "10d"
+ timestamp, err = ParseRelativeTimeInSeconds(testDaysStr)
+ assert.Nil(t, err)
+ assert.Equal(t, time.Duration(10)*time.Hour*24, timestamp)
+
+ testWeekStr := "10w"
+ timestamp, err = ParseRelativeTimeInSeconds(testWeekStr)
+ assert.Nil(t, err)
+ assert.Equal(t, time.Duration(10)*time.Hour*24*7, timestamp)
+
+ testYearStr := "10y"
+ timestamp, err = ParseRelativeTimeInSeconds(testYearStr)
+ assert.Nil(t, err)
+ assert.Equal(t, time.Duration(10)*time.Hour*24*7*365, timestamp)
+}