| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package pulsar |
| |
| import ( |
| "context" |
| "fmt" |
| "log" |
| "testing" |
| "time" |
| |
| "github.com/docker/docker/api/types/container" |
| "github.com/docker/go-connections/nat" |
| "github.com/stretchr/testify/require" |
| "github.com/testcontainers/testcontainers-go" |
| "github.com/testcontainers/testcontainers-go/wait" |
| |
| "github.com/apache/pulsar-client-go/pulsar/internal" |
| "github.com/apache/pulsar-client-go/pulsaradmin" |
| "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" |
| "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" |
| "github.com/stretchr/testify/assert" |
| ) |
| |
| func TestRetryEnableZeroQueueConsumer(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topic := newTopicName() |
| |
| // create consumer |
| _, err = client.Subscribe(ConsumerOptions{ |
| Topic: topic, |
| SubscriptionName: "my-sub", |
| RetryEnable: true, |
| EnableZeroQueueConsumer: true, |
| }) |
| assert.ErrorContains(t, err, "ZeroQueueConsumer is not supported with RetryEnable") |
| } |
| |
| func TestNormalZeroQueueConsumer(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topic := newTopicName() |
| ctx := context.Background() |
| |
| // create consumer |
| consumer, err := client.Subscribe(ConsumerOptions{ |
| Topic: topic, |
| SubscriptionName: "my-sub", |
| EnableZeroQueueConsumer: true, |
| }) |
| assert.Nil(t, err) |
| _, ok := consumer.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer.Close() |
| |
| // create producer |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topic, |
| DisableBatching: false, |
| }) |
| assert.Nil(t, err) |
| defer producer.Close() |
| |
| // send 10 messages |
| for i := 0; i < 10; i++ { |
| msg, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("hello-%d", i)), |
| Key: "pulsar", |
| Properties: map[string]string{ |
| "key-1": "pulsar-1", |
| }, |
| }) |
| assert.Nil(t, err) |
| log.Printf("send message: %s", msg.String()) |
| } |
| |
| // receive 10 messages |
| for i := 0; i < 10; i++ { |
| msg, err := consumer.Receive(context.Background()) |
| if err != nil { |
| log.Fatal(err) |
| } |
| |
| expectMsg := fmt.Sprintf("hello-%d", i) |
| expectProperties := map[string]string{ |
| "key-1": "pulsar-1", |
| } |
| assert.Equal(t, []byte(expectMsg), msg.Payload()) |
| assert.Equal(t, "pulsar", msg.Key()) |
| assert.Equal(t, expectProperties, msg.Properties()) |
| // ack message |
| err = consumer.Ack(msg) |
| assert.Nil(t, err) |
| log.Printf("receive message: %s", msg.ID().String()) |
| } |
| err = consumer.Unsubscribe() |
| assert.Nil(t, err) |
| } |
| func TestReconnectConsumer(t *testing.T) { |
| |
| req := testcontainers.ContainerRequest{ |
| Name: "pulsar-test", |
| Image: getPulsarTestImage(), |
| ExposedPorts: []string{"6650/tcp", "8080/tcp"}, |
| WaitingFor: wait.ForExposedPort(), |
| HostConfigModifier: func(config *container.HostConfig) { |
| config.PortBindings = map[nat.Port][]nat.PortBinding{ |
| "6650/tcp": {{HostIP: "0.0.0.0", HostPort: "6659"}}, |
| "8080/tcp": {{HostIP: "0.0.0.0", HostPort: "8089"}}, |
| } |
| }, |
| Cmd: []string{"bin/pulsar", "standalone", "-nfw"}, |
| } |
| c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ |
| ContainerRequest: req, |
| Started: true, |
| Reuse: true, |
| }) |
| require.NoError(t, err, "Failed to start the pulsar container") |
| endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") |
| require.NoError(t, err, "Failed to get the pulsar endpoint") |
| |
| client, err := NewClient(ClientOptions{ |
| URL: endpoint, |
| }) |
| assert.Nil(t, err) |
| adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", "http") |
| assert.Nil(t, err) |
| admin, err := pulsaradmin.NewClient(&config.Config{ |
| WebServiceURL: adminEndpoint, |
| }) |
| assert.Nil(t, err) |
| |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topic := newTopicName() |
| ctx := context.Background() |
| var consumer Consumer |
| require.Eventually(t, func() bool { |
| consumer, err = client.Subscribe(ConsumerOptions{ |
| Topic: topic, |
| SubscriptionName: "my-sub", |
| EnableZeroQueueConsumer: true, |
| }) |
| return err == nil |
| }, 30*time.Second, 1*time.Second) |
| |
| assert.Nil(t, err) |
| _, ok := consumer.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer.Close() |
| |
| // create producer |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topic, |
| DisableBatching: false, |
| }) |
| assert.Nil(t, err) |
| |
| // send 10 messages |
| for i := 0; i < 10; i++ { |
| msg, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("hello-%d", i)), |
| Key: "pulsar", |
| Properties: map[string]string{ |
| "key-1": "pulsar-1", |
| }, |
| }) |
| assert.Nil(t, err) |
| log.Printf("send message: %s", msg.String()) |
| } |
| |
| ch := make(chan struct{}) |
| |
| go func() { |
| time.Sleep(3 * time.Second) |
| log.Println("unloading topic") |
| // Simulate a broker restart by stopping the pulsar container |
| topicName, err := utils.GetTopicName(topic) |
| assert.Nil(t, err) |
| err = admin.Topics().Unload(*topicName) |
| assert.Nil(t, err) |
| log.Println("unloaded topic") |
| ch <- struct{}{} |
| }() |
| |
| // receive 10 messages |
| for i := 0; i < 10; i++ { |
| if i == 3 { |
| <-ch |
| } |
| msg, err := consumer.Receive(context.Background()) |
| if err != nil { |
| log.Fatal(err) |
| } |
| |
| expectMsg := fmt.Sprintf("hello-%d", i) |
| expectProperties := map[string]string{ |
| "key-1": "pulsar-1", |
| } |
| assert.Equal(t, []byte(expectMsg), msg.Payload()) |
| assert.Equal(t, "pulsar", msg.Key()) |
| assert.Equal(t, expectProperties, msg.Properties()) |
| // ack message |
| err = consumer.Ack(msg) |
| assert.Nil(t, err) |
| log.Printf("receive message: %s", msg.ID().String()) |
| } |
| err = consumer.Unsubscribe() |
| assert.Nil(t, err) |
| consumer.Close() |
| producer.Close() |
| defer c.Terminate(ctx) |
| } |
| |
| func TestMultipleConsumer(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| if err != nil { |
| log.Fatal(err) |
| } |
| defer client.Close() |
| |
| topic := newTopicName() |
| ctx := context.Background() |
| |
| // create consumer1 |
| consumer1, err := client.Subscribe(ConsumerOptions{ |
| Topic: topic, |
| SubscriptionName: "my-sub", |
| Type: Shared, |
| EnableZeroQueueConsumer: true, |
| }) |
| assert.Nil(t, err) |
| _, ok := consumer1.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer1.Close() |
| |
| // create consumer2 |
| consumer2, err := client.Subscribe(ConsumerOptions{ |
| Topic: topic, |
| SubscriptionName: "my-sub", |
| Type: Shared, |
| EnableZeroQueueConsumer: true, |
| }) |
| assert.Nil(t, err) |
| _, ok = consumer2.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer2.Close() |
| |
| // create producer |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topic, |
| DisableBatching: true, |
| }) |
| assert.Nil(t, err) |
| defer producer.Close() |
| |
| sendNum := 10 |
| // send 10 messages |
| for i := 0; i < sendNum; i++ { |
| msg, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("hello-%d", i)), |
| Key: "pulsar", |
| Properties: map[string]string{ |
| "key-1": "pulsar-1", |
| }, |
| }) |
| assert.Nil(t, err) |
| log.Printf("send message: %s", msg.String()) |
| } |
| |
| // receive messages |
| for i := 0; i < sendNum/2; i++ { |
| msg, err := consumer1.Receive(context.Background()) |
| if err != nil { |
| log.Fatal(err) |
| } |
| log.Printf("consumer1 receive message: %s %s", msg.ID().String(), msg.Payload()) |
| // ack message |
| consumer1.Ack(msg) |
| } |
| |
| // receive messages |
| for i := 0; i < sendNum/2; i++ { |
| msg, err := consumer2.Receive(context.Background()) |
| if err != nil { |
| log.Fatal(err) |
| } |
| log.Printf("consumer2 receive message: %s %s", msg.ID().String(), msg.Payload()) |
| // ack message |
| consumer2.Ack(msg) |
| } |
| |
| } |
| |
| func TestPartitionZeroQueueConsumer(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topic := newTopicName() |
| err = createPartitionedTopic(topic, 2) |
| assert.Nil(t, err) |
| |
| // create consumer |
| consumer, err := client.Subscribe(ConsumerOptions{ |
| Topic: topic, |
| SubscriptionName: "my-sub", |
| EnableZeroQueueConsumer: true, |
| }) |
| assert.Nil(t, consumer) |
| assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics") |
| } |
| func TestSpecifiedPartitionZeroQueueConsumer(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topic := newTopicName() |
| ctx := context.Background() |
| err = createPartitionedTopic(topic, 2) |
| assert.Nil(t, err) |
| topics, err := client.TopicPartitions(topic) |
| assert.Nil(t, err) |
| |
| // create consumer |
| consumer, err := client.Subscribe(ConsumerOptions{ |
| Topic: topics[1], |
| SubscriptionName: "my-sub", |
| EnableZeroQueueConsumer: true, |
| }) |
| assert.Nil(t, err) |
| _, ok := consumer.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer.Close() |
| |
| // create producer |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topics[1], |
| DisableBatching: false, |
| }) |
| assert.Nil(t, err) |
| defer producer.Close() |
| |
| // send 10 messages |
| for i := 0; i < 10; i++ { |
| msg, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("hello-%d", i)), |
| Key: "pulsar", |
| Properties: map[string]string{ |
| "key-1": "pulsar-1", |
| }, |
| }) |
| assert.Nil(t, err) |
| log.Printf("send message: %s", msg.String()) |
| } |
| |
| // receive 10 messages |
| for i := 0; i < 10; i++ { |
| msg, err := consumer.Receive(context.Background()) |
| if err != nil { |
| log.Fatal(err) |
| } |
| |
| expectMsg := fmt.Sprintf("hello-%d", i) |
| expectProperties := map[string]string{ |
| "key-1": "pulsar-1", |
| } |
| assert.Equal(t, []byte(expectMsg), msg.Payload()) |
| assert.Equal(t, "pulsar", msg.Key()) |
| assert.Equal(t, expectProperties, msg.Properties()) |
| // ack message |
| err = consumer.Ack(msg) |
| assert.Nil(t, err) |
| log.Printf("receive message: %s", msg.ID().String()) |
| } |
| err = consumer.Unsubscribe() |
| assert.Nil(t, err) |
| } |
| |
| func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| partition := 1 |
| topic := newTopicName() |
| // create consumer |
| consumer, err := client.Subscribe(ConsumerOptions{ |
| Topic: topic, |
| SubscriptionName: "my-sub", |
| Type: Shared, |
| EnableZeroQueueConsumer: true, |
| }) |
| _, ok := consumer.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| assert.Nil(t, err) |
| defer consumer.Close() |
| |
| // create producer |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topic, |
| DisableBatching: true, |
| }) |
| assert.Nil(t, err) |
| defer producer.Close() |
| |
| ctx := context.Background() |
| // send messages |
| totalMessage := 10 |
| for i := 0; i < totalMessage; i++ { |
| if _, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("hello-%d", i)), |
| }); err != nil { |
| assert.Nil(t, err) |
| } |
| } |
| |
| // create admin |
| admin, err := pulsaradmin.NewClient(&config.Config{}) |
| assert.Nil(t, err) |
| |
| messageIDs, err := consumer.GetLastMessageIDs() |
| assert.Nil(t, err) |
| assert.Equal(t, partition, len(messageIDs)) |
| |
| id := messageIDs[0] |
| topicName, err := utils.GetTopicName(id.Topic()) |
| assert.Nil(t, err) |
| messages, err := admin.Subscriptions().GetMessagesByID(*topicName, id.LedgerID(), id.EntryID()) |
| assert.Nil(t, err) |
| assert.Equal(t, 1, len(messages)) |
| err = consumer.UnsubscribeForce() |
| assert.Nil(t, err) |
| |
| } |
| |
| func TestZeroQueueConsumer_Chan(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topic := newTopicName() |
| ctx := context.Background() |
| |
| // create consumer |
| consumer, err := client.Subscribe(ConsumerOptions{ |
| Topic: topic, |
| SubscriptionName: "my-sub", |
| EnableZeroQueueConsumer: true, |
| }) |
| assert.Nil(t, err) |
| _, ok := consumer.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer.Close() |
| |
| // create producer |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topic, |
| DisableBatching: false, |
| }) |
| assert.Nil(t, err) |
| defer producer.Close() |
| |
| // send 10 messages |
| for i := 0; i < 10; i++ { |
| msg, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("hello-%d", i)), |
| }) |
| assert.Nil(t, err) |
| log.Printf("send message: %s", msg.String()) |
| } |
| assertPanic(t, "zeroQueueConsumer cannot support Chan method", func() { |
| consumer.Chan() |
| }) |
| } |
| |
| func assertPanic(t *testing.T, panicValue interface{}, f func()) { |
| defer func() { |
| if r := recover(); r != panicValue { |
| t.Errorf("Expected panic %v, but got %v", panicValue, r) |
| } |
| }() |
| f() |
| } |
| |
| func TestZeroQueueConsumer_AckCumulativeConsumer(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topic := newTopicName() |
| ctx := context.Background() |
| |
| // create consumer |
| consumer, err := client.Subscribe(ConsumerOptions{ |
| Topic: topic, |
| SubscriptionName: "my-sub", |
| EnableZeroQueueConsumer: true, |
| }) |
| assert.Nil(t, err) |
| _, ok := consumer.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer.Close() |
| |
| // create producer |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topic, |
| DisableBatching: false, |
| }) |
| assert.Nil(t, err) |
| defer producer.Close() |
| |
| // send 10 messages |
| for i := 0; i < 10; i++ { |
| msg, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("hello-%d", i)), |
| }) |
| assert.Nil(t, err) |
| log.Printf("send message: %s", msg.String()) |
| } |
| |
| var lastID MessageID |
| // receive 10 messages |
| for i := 0; i < 10; i++ { |
| msg, err := consumer.Receive(context.Background()) |
| lastID = msg.ID() |
| if err != nil { |
| log.Fatal(err) |
| } |
| expectMsg := fmt.Sprintf("hello-%d", i) |
| assert.Equal(t, []byte(expectMsg), msg.Payload()) |
| } |
| err = consumer.AckIDCumulative(lastID) |
| assert.Nil(t, err) |
| |
| } |
| |
| func TestZeroQueueConsumer_Nack(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topicName := newTopicName() |
| ctx := context.Background() |
| |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topicName, |
| }) |
| assert.Nil(t, err) |
| defer producer.Close() |
| |
| consumer, err := client.Subscribe(ConsumerOptions{ |
| Topic: topicName, |
| SubscriptionName: "sub-1", |
| Type: Shared, |
| NackRedeliveryDelay: 1 * time.Second, |
| EnableZeroQueueConsumer: true, |
| }) |
| assert.Nil(t, err) |
| _, ok := consumer.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer.Close() |
| |
| const N = 100 |
| |
| for i := 0; i < N; i++ { |
| if _, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("msg-content-%d", i)), |
| }); err != nil { |
| t.Fatal(err) |
| } |
| } |
| |
| for i := 0; i < N; i++ { |
| msg, err := consumer.Receive(ctx) |
| assert.Nil(t, err) |
| assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) |
| |
| if i%2 == 0 { |
| // Only acks even messages |
| err = consumer.Ack(msg) |
| assert.Nil(t, err) |
| } else { |
| // Fails to process odd messages |
| consumer.Nack(msg) |
| } |
| } |
| |
| // Failed messages should be resent |
| |
| // We should only receive the odd messages |
| for i := 1; i < N; i += 2 { |
| msg, err := consumer.Receive(ctx) |
| assert.Nil(t, err) |
| assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) |
| |
| err = consumer.Ack(msg) |
| assert.Nil(t, err) |
| } |
| } |
| |
| func TestZeroQueueConsumer_Seek(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topicName := newTopicName() |
| ctx := context.Background() |
| |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topicName, |
| DisableBatching: false, |
| }) |
| assert.Nil(t, err) |
| defer producer.Close() |
| |
| consumer, err := client.Subscribe(ConsumerOptions{ |
| Topic: topicName, |
| EnableZeroQueueConsumer: true, |
| SubscriptionName: "sub-1", |
| StartMessageIDInclusive: true, |
| }) |
| assert.Nil(t, err) |
| _, ok := consumer.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer.Close() |
| |
| const N = 10 |
| var seekID MessageID |
| for i := 0; i < N; i++ { |
| id, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("hello-%d", i)), |
| }) |
| assert.Nil(t, err) |
| |
| if i == N-5 { |
| seekID = id |
| } |
| } |
| |
| // Don't consume all messages so some stay in queues |
| for i := 0; i < N; i++ { |
| msg, err := consumer.Receive(ctx) |
| assert.Nil(t, err) |
| assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) |
| err = consumer.Ack(msg) |
| assert.Nil(t, err) |
| } |
| |
| err = consumer.Seek(seekID) |
| assert.Nil(t, err) |
| |
| time.Sleep(time.Second * 3) |
| msg, err := consumer.Receive(ctx) |
| assert.Nil(t, err) |
| log.Printf("msg: %s", string(msg.Payload())) |
| assert.Equal(t, fmt.Sprintf("hello-%d", N-5), string(msg.Payload())) |
| } |
| |
| func TestZeroQueueConsumer_SeekByTime(t *testing.T) { |
| client, err := NewClient(ClientOptions{ |
| URL: lookupURL, |
| }) |
| assert.Nil(t, err) |
| defer client.Close() |
| |
| topicName := newTopicName() |
| ctx := context.Background() |
| |
| producer, err := client.CreateProducer(ProducerOptions{ |
| Topic: topicName, |
| DisableBatching: false, |
| }) |
| assert.Nil(t, err) |
| defer producer.Close() |
| |
| consumer, err := client.Subscribe(ConsumerOptions{ |
| Topic: topicName, |
| EnableZeroQueueConsumer: true, |
| SubscriptionName: "my-sub", |
| }) |
| assert.Nil(t, err) |
| _, ok := consumer.(*zeroQueueConsumer) |
| assert.True(t, ok) |
| defer consumer.Close() |
| |
| const N = 10 |
| resetTimeStr := "100s" |
| retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr) |
| assert.Nil(t, err) |
| |
| for i := 0; i < N; i++ { |
| _, err := producer.Send(ctx, &ProducerMessage{ |
| Payload: []byte(fmt.Sprintf("hello-%d", i)), |
| }) |
| assert.Nil(t, err) |
| } |
| |
| // Don't consume all messages so some stay in queues |
| for i := 0; i < N-2; i++ { |
| msg, err := consumer.Receive(ctx) |
| assert.Nil(t, err) |
| assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) |
| err = consumer.Ack(msg) |
| assert.Nil(t, err) |
| } |
| |
| currentTimestamp := time.Now() |
| err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond)) |
| assert.Nil(t, err) |
| |
| time.Sleep(time.Second * 3) |
| |
| for i := 0; i < N; i++ { |
| msg, err := consumer.Receive(ctx) |
| assert.Nil(t, err) |
| assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) |
| err = consumer.Ack(msg) |
| assert.Nil(t, err) |
| } |
| } |