blob: 4120ba4bac019b59bd90b3007339e7bf4e5e103f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
plog "github.com/apache/pulsar-client-go/pulsar/log"
"github.com/google/uuid"
"github.com/pierrec/lz4"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
)
var (
adminURL = "http://localhost:8080"
lookupURL = "pulsar://localhost:6650"
)
func TestProducerConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "my-topic"
ctx := context.Background()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
}); err != nil {
log.Fatal(err)
}
}
// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
consumer.Ack(msg)
}
}
func TestConsumerConnectError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://invalid-hostname:6650",
})
assert.Nil(t, err)
defer client.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription",
})
// Expect error in creating consumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "connection error")
}
func TestBatchMessageReceive(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := "persistent://public/default/receive-batch"
subName := "subscription-name"
prefix := "msg-batch-"
ctx := context.Background()
// Enable batching on producer side
batchSize, numOfMessages := 2, 100
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
BatchingMaxMessages: uint(batchSize),
DisableBatching: false,
})
assert.Nil(t, err)
assert.Equal(t, topicName, producer.Topic())
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Nil(t, err)
defer consumer.Close()
count := 0
for i := 0; i < numOfMessages; i++ {
messageContent := prefix + fmt.Sprintf("%d", i)
msg := &ProducerMessage{
Payload: []byte(messageContent),
}
_, err := producer.Send(ctx, msg)
assert.Nil(t, err)
}
for i := 0; i < numOfMessages; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
consumer.Ack(msg)
count++
}
assert.Equal(t, count, numOfMessages)
}
func TestConsumerWithInvalidConf(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
if err != nil {
t.Fatal(err)
return
}
defer client.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "my-topic",
})
// Expect error in creating consumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
fmt.Println(err.Error())
assert.Equal(t, err.(*Error).Result(), SubscriptionNotFound)
consumer, err = client.Subscribe(ConsumerOptions{
SubscriptionName: "my-subscription",
})
// Expect error in creating consumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, err.(*Error).Result(), TopicNotFound)
}
func TestConsumerSubscriptionEarliestPosition(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := fmt.Sprintf("testSeek-%d", time.Now().Unix())
subName := "test-subscription-initial-earliest-position"
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
// send message
ctx := context.Background()
_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("msg-1-content-1"),
})
assert.Nil(t, err)
_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("msg-1-content-2"),
})
assert.Nil(t, err)
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer consumer.Close()
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
}
func TestConsumerKeyShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/test-topic-6"
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer1.Close()
consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer2.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()
ctx := context.Background()
for i := 0; i < 100; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Key: fmt.Sprintf("key-shared-%d", i%3),
Payload: []byte(fmt.Sprintf("value-%d", i)),
})
assert.Nil(t, err)
}
receivedConsumer1 := 0
receivedConsumer2 := 0
for (receivedConsumer1 + receivedConsumer2) < 100 {
select {
case cm, ok := <-consumer1.Chan():
if !ok {
break
}
receivedConsumer1++
consumer1.Ack(cm.Message)
case cm, ok := <-consumer2.Chan():
if !ok {
break
}
receivedConsumer2++
consumer2.Ack(cm.Message)
}
}
assert.NotEqual(t, 0, receivedConsumer1)
assert.NotEqual(t, 0, receivedConsumer2)
t.Logf("TestConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
receivedConsumer1, receivedConsumer2)
assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
}
func TestPartitionTopicsConsumerPubSub(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/testGetPartitions5"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions5/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "64")
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
topics, err := client.TopicPartitions(topic)
assert.Nil(t, err)
assert.Equal(t, topic+"-partition-0", topics[0])
assert.Equal(t, topic+"-partition-1", topics[1])
assert.Equal(t, topic+"-partition-2", topics[2])
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
ReceiverQueueSize: 10,
})
assert.Nil(t, err)
defer consumer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
msgs := make([]string, 0)
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
msgs = append(msgs, string(msg.Payload()))
t.Logf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
assert.Equal(t, len(msgs), 10)
}
type TestActiveConsumerListener struct {
t *testing.T
lock sync.RWMutex
nameToPartitions map[string]map[int32]struct{}
}
func (l *TestActiveConsumerListener) getConsumerCount() int {
l.lock.RLock()
defer l.lock.RUnlock()
return len(l.nameToPartitions)
}
func (l *TestActiveConsumerListener) getPartitionCount(consumerName string) int {
l.lock.RLock()
defer l.lock.RUnlock()
return len(l.nameToPartitions[consumerName])
}
func (l *TestActiveConsumerListener) BecameActive(consumer Consumer, topicName string, partition int32) {
l.t.Logf("%s become active on %s - %d\n", consumer.Name(), topicName, partition)
l.lock.Lock()
defer l.lock.Unlock()
partitionSet := l.nameToPartitions[consumer.Name()]
if partitionSet == nil {
partitionSet = map[int32]struct{}{}
}
partitionSet[partition] = struct{}{}
l.nameToPartitions[consumer.Name()] = partitionSet
}
func (l *TestActiveConsumerListener) BecameInactive(consumer Consumer, topicName string, partition int32) {
l.t.Logf("%s become inactive on %s - %d\n", consumer.Name(), topicName, partition)
l.lock.Lock()
defer l.lock.Unlock()
partitionSet := l.nameToPartitions[consumer.Name()]
if _, ok := partitionSet[partition]; ok {
delete(partitionSet, partition)
if len(partitionSet) == 0 {
delete(l.nameToPartitions, consumer.Name())
}
}
}
func allConsume(consumers []Consumer) {
for i := 0; i < len(consumers); i++ {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
consumers[i].Receive(ctx)
}
}
func TestPartitionTopic_ActiveConsumerChanged(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + randomName + "/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "3")
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
var consumers []Consumer
listener := &TestActiveConsumerListener{
t: t,
nameToPartitions: map[string]map[int32]struct{}{},
}
for i := 0; i < 1; i++ {
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Name: fmt.Sprintf("consumer-%d", i),
SubscriptionName: "my-sub",
Type: Failover,
EventListener: listener,
})
assert.Nil(t, err)
defer consumer.Close()
consumers = append(consumers, consumer)
}
allConsume(consumers)
// first consumer will get 3 partitions
assert.Equal(t, 1, listener.getConsumerCount())
assert.Equal(t, 3, listener.getPartitionCount(consumers[0].Name()))
// 1 partition per consumer
for i := 1; i < 3; i++ {
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Name: fmt.Sprintf("consumer-%d", i),
SubscriptionName: "my-sub",
Type: Failover,
EventListener: listener,
})
assert.Nil(t, err)
defer consumer.Close()
consumers = append(consumers, consumer)
}
allConsume(consumers)
assert.Equal(t, 3, listener.getConsumerCount())
for _, c := range consumers {
assert.Equal(t, 1, listener.getPartitionCount(c.Name()))
}
consumers[0].Close()
// wait broker reschedule active consumers
time.Sleep(time.Second * 3)
allConsume(consumers)
// close consumer won't get notify
assert.Equal(t, 3, listener.getConsumerCount())
// residual consumers will cover all partitions
assert.Equal(t, 3, listener.getPartitionCount(consumers[1].Name())+listener.getPartitionCount(consumers[2].Name()))
for _, c := range consumers {
c.Close()
}
}
func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "6")
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
})
assert.Nil(t, err)
defer producer.Close()
topics, err := client.TopicPartitions(topic)
assert.Nil(t, err)
assert.Equal(t, topic+"-partition-0", topics[0])
assert.Equal(t, topic+"-partition-1", topics[1])
assert.Equal(t, topic+"-partition-2", topics[2])
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
ReceiverQueueSize: 10,
Decryption: &MessageDecryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail,
},
})
assert.Nil(t, err)
defer consumer.Close()
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
msgs := make([]string, 0)
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
msgs = append(msgs, string(msg.Payload()))
t.Logf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
assert.Equal(t, len(msgs), 10)
}
func TestConsumerReceiveTimeout(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "test-topic-with-no-messages"
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub1",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()
msg, err := consumer.Receive(ctx)
assert.Nil(t, msg)
assert.NotNil(t, err)
}
func TestConsumerShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/testMultiPartitionConsumerShared"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "3")
sub := "sub-shared-1"
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
Type: Shared,
})
assert.Nil(t, err)
defer consumer1.Close()
consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
Type: Shared,
})
assert.Nil(t, err)
defer consumer2.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages with unique payloads
for i := 0; i < 10; i++ {
if _, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
fmt.Println("sending message:", fmt.Sprintf("hello-%d", i))
}
readMsgs := 0
messages := make(map[string]struct{})
for readMsgs < 10 {
select {
case cm, ok := <-consumer1.Chan():
if !ok {
break
}
readMsgs++
payload := string(cm.Message.Payload())
messages[payload] = struct{}{}
t.Logf("consumer1 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
consumer1.Ack(cm.Message)
case cm, ok := <-consumer2.Chan():
if !ok {
break
}
readMsgs++
payload := string(cm.Message.Payload())
messages[payload] = struct{}{}
t.Logf("consumer2 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
consumer2.Ack(cm.Message)
}
}
assert.Equal(t, 10, len(messages))
}
func TestConsumerEventTime(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := "test-event-time"
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
})
assert.Nil(t, err)
defer consumer.Close()
et := timeFromUnixTimestampMillis(uint64(5))
_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("test"),
EventTime: et,
})
assert.Nil(t, err)
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, et, msg.EventTime())
assert.Equal(t, "test", string(msg.Payload()))
}
func TestConsumerWithoutEventTime(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := "test-without-event-time"
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
})
assert.Nil(t, err)
defer consumer.Close()
_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("test"),
})
assert.Nil(t, err)
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, int64(0), msg.EventTime().UnixNano())
assert.Equal(t, "test", string(msg.Payload()))
}
func TestConsumerFlow(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := "test-received-since-flow"
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
ReceiverQueueSize: 4,
})
assert.Nil(t, err)
defer consumer.Close()
for msgNum := 0; msgNum < 100; msgNum++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
}); err != nil {
t.Fatal(err)
}
}
for msgNum := 0; msgNum < 100; msgNum++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", msgNum), string(msg.Payload()))
}
}
func TestConsumerAck(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
})
assert.Nil(t, err)
const N = 100
for i := 0; i < N; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}
}
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
if i < N/2 {
// Only acks the first half of messages
consumer.Ack(msg)
}
}
consumer.Close()
// Subscribe again
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()
// We should only receive the 2nd half of messages
for i := N / 2; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}
}
func TestConsumerNoBatchCumulativeAck(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
// disable batching
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Exclusive,
})
assert.Nil(t, err)
const N = 100
for i := 0; i < N; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}
}
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
if i == N/2-1 {
// cumulative acks the first half of messages
assert.Nil(t, consumer.AckCumulative(msg))
}
}
consumer.Close()
// Subscribe again
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()
// We should only receive the 2nd half of messages
for i := N / 2; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
assert.Nil(t, consumer.Ack(msg))
}
}
func TestConsumerBatchCumulativeAck(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
c1, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Exclusive,
})
assert.Nil(t, err)
// c2 is used to test if previous batch can be acked
// when cumulative ack the next batch message id
c2, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-2",
Type: Exclusive,
})
assert.Nil(t, err)
const N = 100
// send a batch
wg := sync.WaitGroup{}
for i := 0; i < N; i++ {
wg.Add(1)
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i))},
func(id MessageID, producerMessage *ProducerMessage, e error) {
assert.NoError(t, e)
wg.Done()
})
}
wg.Wait()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)
// send another batch
wg = sync.WaitGroup{}
for i := N; i < 2*N; i++ {
wg.Add(1)
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i))},
func(id MessageID, producerMessage *ProducerMessage, e error) {
assert.NoError(t, e)
wg.Done()
})
}
wg.Wait()
for i := 0; i < 2*N; i++ {
msg, err := c1.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
if i == N-1 {
// cumulative ack the first half of messages
c1.AckCumulative(msg)
} else if i == N {
// the N+1 msg is in the second batch
// cumulative ack it to test if the first batch can be acked
c2.AckCumulative(msg)
}
}
c1.Close()
c2.Close()
// Subscribe again
c1, err = client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Exclusive,
})
assert.Nil(t, err)
defer c1.Close()
// Subscribe again
c2, err = client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-2",
Type: Exclusive,
})
assert.Nil(t, err)
defer c2.Close()
// We should only receive the 2nd half of messages
for i := N; i < 2*N; i++ {
msg, err := c1.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
c1.Ack(msg)
}
// We should only receive the 2nd half of messages
for i := N; i < 2*N; i++ {
msg, err := c2.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
c2.Ack(msg)
}
}
func TestConsumerNack(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
defer consumer.Close()
const N = 100
for i := 0; i < N; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}
}
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
if i%2 == 0 {
// Only acks even messages
consumer.Ack(msg)
} else {
// Fails to process odd messages
consumer.Nack(msg)
}
}
// Failed messages should be resent
// We should only receive the odd messages
for i := 1; i < N; i += 2 {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}
}
func TestConsumerCompression(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
// enable batching
p1, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
CompressionType: LZ4,
})
assert.Nil(t, err)
defer p1.Close()
// disable batching
p2, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
CompressionType: LZ4,
DisableBatching: true,
})
assert.Nil(t, err)
defer p2.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
})
assert.Nil(t, err)
defer consumer.Close()
const N = 100
for i := 0; i < N; i++ {
if _, err := p1.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d-batching-enabled", i)),
}); err != nil {
t.Fatal(err)
}
}
for i := 0; i < N; i++ {
if _, err := p2.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d-batching-disabled", i)),
}); err != nil {
t.Fatal(err)
}
}
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-enabled", i), string(msg.Payload()))
consumer.Ack(msg)
}
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-disabled", i), string(msg.Payload()))
consumer.Ack(msg)
}
}
func TestConsumerCompressionWithBatches(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
CompressionType: ZLib,
BatchingMaxPublishDelay: 1 * time.Minute,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
})
assert.Nil(t, err)
defer consumer.Close()
const N = 100
for i := 0; i < N; i++ {
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}, nil)
}
producer.FlushWithCtx(context.Background())
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}
}
func TestConsumerSeek(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
})
assert.Nil(t, err)
defer consumer.Close()
// Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
const N = 1100
var seekID MessageID
for i := 0; i < N; i++ {
id, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
if i == N-50 {
seekID = id
}
}
// Don't consume all messages so some stay in queues
for i := 0; i < N-20; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}
err = consumer.Seek(seekID)
assert.Nil(t, err)
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload()))
}
func TestConsumerSeekByTime(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
defer consumer.Close()
// Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
const N = 1100
resetTimeStr := "100s"
retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
assert.Nil(t, err)
for i := 0; i < N; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
// Don't consume all messages so some stay in queues
for i := 0; i < N-20; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}
currentTimestamp := time.Now()
err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
assert.Nil(t, err)
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}
}
func TestConsumerMetadata(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()
topic := newTopicName()
props := map[string]string{
"key1": "value1",
}
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Properties: props,
})
if err != nil {
t.Fatal(err)
}
defer consumer.Close()
stats, err := topicStats(topic)
if err != nil {
t.Fatal(err)
}
subs := stats["subscriptions"].(map[string]interface{})
cons := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})
meta := cons["metadata"].(map[string]interface{})
assert.Equal(t, len(props), len(meta))
for k, v := range props {
mv := meta[k].(string)
assert.Equal(t, v, mv)
}
}
// Test for issue #140
// Don't block on receive if the consumer has been closed
func TestConsumerReceiveErrAfterClose(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()
topicName := newTopicName()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
})
if err != nil {
t.Fatal(err)
}
consumer.Close()
errorCh := make(chan error)
go func() {
_, err = consumer.Receive(context.Background())
errorCh <- err
}()
select {
case <-time.After(200 * time.Millisecond):
case err = <-errorCh:
}
assert.Equal(t, ConsumerClosed, err.(*Error).result)
}
func TestDLQ(t *testing.T) {
DLQWithProducerOptions(t, nil)
}
func TestDLQWithProducerOptions(t *testing.T) {
DLQWithProducerOptions(t,
&ProducerOptions{
BatchingMaxPublishDelay: 100 * time.Millisecond,
BatchingMaxSize: 64 * 1024,
CompressionType: ZLib,
})
}
func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
dlqTopic := newTopicName()
// create consumer on the DLQ topic to verify the routing
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: dlqTopic,
SubscriptionName: "dlq",
})
assert.Nil(t, err)
defer dlqConsumer.Close()
topic := newTopicName()
ctx := context.Background()
// create consumer
dlqPolicy := DLQPolicy{
MaxDeliveries: 3,
DeadLetterTopic: dlqTopic,
}
if prodOpt != nil {
dlqPolicy.ProducerOptions = *prodOpt
}
sub, consumerName := "my-sub", "my-consumer"
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
DLQ: &dlqPolicy,
Name: consumerName,
})
assert.Nil(t, err)
defer consumer.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}
// receive 10 messages and only ack half-of-them
for i := 0; i < 10; i++ {
msg, _ := consumer.Receive(context.Background())
if i%2 == 0 {
// ack message
consumer.Ack(msg)
} else {
consumer.Nack(msg)
}
}
// Receive the unacked messages other 2 times, failing at processing
for i := 0; i < 2; i++ {
for i := 0; i < 5; i++ {
msg, _ := consumer.Receive(context.Background())
consumer.Nack(msg)
}
}
// 5 Messages should now be routed to the DLQ
for i := 0; i < 5; i++ {
ctx, canc := context.WithTimeout(context.Background(), 5*time.Second)
defer canc()
msg, err := dlqConsumer.Receive(ctx)
assert.NoError(t, err)
expectedMsgIdx := 2*i + 1
expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx)
assert.Equal(t, []byte(expectMsg), msg.Payload())
// check dql produceName
assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ", topic, sub, consumerName))
// check original messageId
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
// check original topic
assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic])
}
// No more messages on the DLQ
ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
msg, err := dlqConsumer.Receive(ctx)
assert.Error(t, err)
assert.Nil(t, msg)
// No more messages on regular consumer
ctx, canc = context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
msg, err = consumer.Receive(ctx)
assert.Error(t, err)
assert.Nil(t, msg)
}
func TestDLQMultiTopics(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
dlqTopic := newTopicName()
// create consumer on the DLQ topic to verify the routing
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: dlqTopic,
SubscriptionName: "dlq",
})
assert.Nil(t, err)
defer dlqConsumer.Close()
topicPrefix := newTopicName()
topics := make([]string, 10)
for i := 0; i < 10; i++ {
topics[i] = fmt.Sprintf("%s-%d", topicPrefix, i)
}
ctx := context.Background()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: "my-sub",
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
DLQ: &DLQPolicy{
MaxDeliveries: 3,
DeadLetterTopic: dlqTopic,
ProducerOptions: ProducerOptions{
BatchingMaxPublishDelay: 100 * time.Millisecond,
},
},
})
assert.Nil(t, err)
defer consumer.Close()
// create one producer for each topic
producers := make([]Producer, 10)
for i, topic := range topics {
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
producers[i] = producer
}
// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producers[i].Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}
// receive 10 messages and only ack half-of-them
for i := 0; i < 10; i++ {
msg, _ := consumer.Receive(context.Background())
if i%2 == 0 {
// ack message
consumer.Ack(msg)
} else {
consumer.Nack(msg)
}
}
// Receive the unacked messages other 2 times, failing at processing
for i := 0; i < 2; i++ {
for i := 0; i < 5; i++ {
msg, _ := consumer.Receive(context.Background())
consumer.Nack(msg)
}
}
// 5 Messages should now be routed to the DLQ
for i := 0; i < 5; i++ {
ctx, canc := context.WithTimeout(context.Background(), 5*time.Second)
defer canc()
_, err := dlqConsumer.Receive(ctx)
assert.NoError(t, err)
}
// No more messages on the DLQ
ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
msg, err := dlqConsumer.Receive(ctx)
assert.Error(t, err)
assert.Nil(t, msg)
// No more messages on regular consumer
ctx, canc = context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
msg, err = consumer.Receive(ctx)
assert.Error(t, err)
assert.Nil(t, msg)
}
func TestRLQ(t *testing.T) {
topic := newTopicName()
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "3")
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
maxRedeliveries := 2
N := 100
ctx := context.Background()
client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()
// 1. Pre-produce N messages
producer, err := client.CreateProducer(ProducerOptions{Topic: topic})
assert.Nil(t, err)
defer producer.Close()
for i := 0; i < N; i++ {
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MESSAGE_%d", i))})
assert.Nil(t, err)
}
// 2. Create consumer on the Retry Topic to reconsume N messages (maxRedeliveries+1) times
rlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{
MaxDeliveries: uint32(maxRedeliveries),
},
RetryEnable: true,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
defer rlqConsumer.Close()
rlqReceived := 0
for rlqReceived < N*(maxRedeliveries+1) {
msg, err := rlqConsumer.Receive(ctx)
assert.Nil(t, err)
rlqConsumer.ReconsumeLater(msg, 1*time.Second)
rlqReceived++
}
fmt.Println("retry consumed:", rlqReceived) // 300
// No more messages on the Retry Topic
rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer rlqCancel()
msg, err := rlqConsumer.Receive(rlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)
// 3. Create consumer on the DLQ topic to verify the routing
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: "persistent://public/default/" + topic + "-" + subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer dlqConsumer.Close()
dlqReceived := 0
for dlqReceived < N {
msg, err := dlqConsumer.Receive(ctx)
assert.Nil(t, err)
dlqConsumer.Ack(msg)
dlqReceived++
}
fmt.Println("dlq received:", dlqReceived) // 100
// No more messages on the DLQ Topic
dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer dlqCancel()
msg, err = dlqConsumer.Receive(dlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)
// 4. No more messages for same subscription
checkConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer checkConsumer.Close()
checkCtx, checkCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer checkCancel()
checkMsg, err := checkConsumer.Receive(checkCtx)
assert.Error(t, err)
assert.Nil(t, checkMsg)
}
func TestRLQWithCustomProperties(t *testing.T) {
topic := newTopicName()
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "3")
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
maxRedeliveries := 2
N := 100
ctx := context.Background()
client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()
// 1. Pre-produce N messages
producer, err := client.CreateProducer(ProducerOptions{Topic: topic})
assert.Nil(t, err)
defer producer.Close()
for i := 0; i < N; i++ {
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MESSAGE_%d", i))})
assert.Nil(t, err)
}
// 2. Create consumer on the Retry Topic to reconsume N messages (maxRedeliveries+1) times
rlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{
MaxDeliveries: uint32(maxRedeliveries),
},
RetryEnable: true,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
defer rlqConsumer.Close()
rlqReceived := 0
for rlqReceived < N*(maxRedeliveries+1) {
msg, err := rlqConsumer.Receive(ctx)
assert.Nil(t, err)
if msg.RedeliveryCount() > 0 {
msgProps := msg.Properties()
value, ok := msgProps["custom-key-1"]
assert.True(t, ok)
if ok {
assert.Equal(t, value, "custom-value-1")
}
rlqConsumer.ReconsumeLater(msg, 1*time.Second)
} else {
customProps := map[string]string{
"custom-key-1": "custom-val-1",
}
rlqConsumer.ReconsumeLaterWithCustomProperties(msg, customProps, 1*time.Second)
}
rlqReceived++
}
fmt.Println("retry consumed:", rlqReceived) // 300
// No more messages on the Retry Topic
rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer rlqCancel()
msg, err := rlqConsumer.Receive(rlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)
// 3. Create consumer on the DLQ topic to verify the routing
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: "persistent://public/default/" + topic + "-" + subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer dlqConsumer.Close()
dlqReceived := 0
for dlqReceived < N {
msg, err := dlqConsumer.Receive(ctx)
assert.Nil(t, err)
dlqConsumer.Ack(msg)
dlqReceived++
}
fmt.Println("dlq received:", dlqReceived) // 100
// No more messages on the DLQ Topic
dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer dlqCancel()
msg, err = dlqConsumer.Receive(dlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)
// 4. No more messages for same subscription
checkConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer checkConsumer.Close()
checkCtx, checkCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer checkCancel()
checkMsg, err := checkConsumer.Receive(checkCtx)
assert.Error(t, err)
assert.Nil(t, checkMsg)
}
func TestAckWithResponse(t *testing.T) {
now := time.Now().Unix()
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now)
ctx := context.Background()
client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic01,
SubscriptionName: "my-sub",
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
AckWithResponse: true,
})
assert.Nil(t, err)
defer consumer.Close()
producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01})
assert.Nil(t, err)
defer producer01.Close()
for i := 0; i < 10; i++ {
_, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))})
assert.Nil(t, err)
}
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
err = consumer.Ack(msg)
assert.Nil(t, err)
}
}
func TestCumulativeAckWithResponse(t *testing.T) {
now := time.Now().Unix()
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now)
ctx := context.Background()
client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic01,
SubscriptionName: "my-sub",
Type: Exclusive,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
AckWithResponse: true,
})
assert.Nil(t, err)
defer consumer.Close()
producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01})
assert.Nil(t, err)
defer producer01.Close()
for i := 0; i < 10; i++ {
_, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))})
assert.Nil(t, err)
}
var msg Message
for i := 0; i < 10; i++ {
msg, err = consumer.Receive(ctx)
assert.Nil(t, err)
}
err = consumer.AckCumulative(msg)
assert.Nil(t, err)
}
func TestRLQMultiTopics(t *testing.T) {
now := time.Now().Unix()
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
topic02 := fmt.Sprintf("topic-%d-2", now)
topics := []string{topic01, topic02}
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
maxRedeliveries := 2
N := 100
ctx := context.Background()
client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()
// subscribe multi topics with Retry Topics
rlqConsumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{MaxDeliveries: uint32(maxRedeliveries)},
RetryEnable: true,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
defer rlqConsumer.Close()
// subscribe DLQ Topic
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topics[0] + "-" + subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer dlqConsumer.Close()
// create multi producers
producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01})
assert.Nil(t, err)
defer producer01.Close()
producer02, err := client.CreateProducer(ProducerOptions{Topic: topic02})
assert.Nil(t, err)
defer producer02.Close()
// 1. Pre-produce N messages for every topic
for i := 0; i < N; i++ {
_, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))})
assert.Nil(t, err)
_, err = producer02.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_02_%d", i))})
assert.Nil(t, err)
}
// 2. Create consumer on the Retry Topics to reconsume 2*N messages (maxRedeliveries+1) times
rlqReceived := 0
for rlqReceived < 2*N*(maxRedeliveries+1) {
msg, err := rlqConsumer.Receive(ctx)
assert.Nil(t, err)
rlqConsumer.ReconsumeLater(msg, 1*time.Second)
rlqReceived++
}
fmt.Println("retry consumed:", rlqReceived) // 600
// No more messages on the Retry Topic
rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer rlqCancel()
msg, err := rlqConsumer.Receive(rlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)
// 3. Create consumer on the DLQ topic to verify the routing
dlqReceived := 0
for dlqReceived < 2*N {
msg, err := dlqConsumer.Receive(ctx)
assert.Nil(t, err)
dlqConsumer.Ack(msg)
dlqReceived++
}
fmt.Println("dlq received:", dlqReceived) // 200
// No more messages on the DLQ Topic
dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer dlqCancel()
msg, err = dlqConsumer.Receive(dlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)
// 4. No more messages for same subscription
checkConsumer, err := client.Subscribe(ConsumerOptions{
Topics: []string{topic01, topic02},
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer checkConsumer.Close()
timeoutCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
checkMsg, err := checkConsumer.Receive(timeoutCtx)
assert.Error(t, err)
assert.Nil(t, checkMsg)
}
func TestRLQSpecifiedPartitionTopic(t *testing.T) {
topic := newTopicName()
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "1")
normalTopic := "persistent://public/default/" + topic
partitionTopic := normalTopic + "-partition-0"
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
maxRedeliveries := 2
N := 100
ctx := context.Background()
client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()
// subscribe topic with partition
rlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: partitionTopic,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{MaxDeliveries: uint32(maxRedeliveries)},
RetryEnable: true,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
defer rlqConsumer.Close()
// subscribe DLQ Topic
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: normalTopic + "-" + subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer dlqConsumer.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{Topic: normalTopic})
assert.Nil(t, err)
defer producer.Close()
// 1. Pre-produce N messages
for i := 0; i < N; i++ {
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))})
assert.Nil(t, err)
}
// 2. Create consumer on the Retry Topics to reconsume N messages (maxRedeliveries+1) times
rlqReceived := 0
for rlqReceived < N*(maxRedeliveries+1) {
msg, err := rlqConsumer.Receive(ctx)
assert.Nil(t, err)
rlqConsumer.ReconsumeLater(msg, 1*time.Second)
rlqReceived++
}
fmt.Println("retry consumed:", rlqReceived) // 300
// No more messages on the Retry Topic
rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer rlqCancel()
msg, err := rlqConsumer.Receive(rlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)
// 3. Create consumer on the DLQ topic to verify the routing
dlqReceived := 0
for dlqReceived < N {
msg, err := dlqConsumer.Receive(ctx)
assert.Nil(t, err)
dlqConsumer.Ack(msg)
dlqReceived++
}
fmt.Println("dlq received:", dlqReceived) // 100
// No more messages on the DLQ Topic
dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer dlqCancel()
msg, err = dlqConsumer.Receive(dlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)
// 4. No more messages for same subscription
checkConsumer, err := client.Subscribe(ConsumerOptions{
Topic: normalTopic,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer checkConsumer.Close()
timeoutCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
checkMsg, err := checkConsumer.Receive(timeoutCtx)
assert.Error(t, err)
assert.Nil(t, checkMsg)
}
func TestGetDeliveryCount(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
ctx := context.Background()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}
// receive 10 messages and only ack half-of-them
for i := 0; i < 10; i++ {
msg, _ := consumer.Receive(context.Background())
if i%2 == 0 {
// ack message
consumer.Ack(msg)
} else {
consumer.Nack(msg)
}
}
// Receive the unacked messages other 2 times, failing at processing
for i := 0; i < 2; i++ {
var msg Message
for i := 0; i < 5; i++ {
msg, err = consumer.Receive(context.Background())
assert.Nil(t, err)
consumer.Nack(msg)
}
assert.Equal(t, uint32(i+1), msg.RedeliveryCount())
}
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.Equal(t, uint32(3), msg.RedeliveryCount())
}
func TestConsumerAddTopicPartitions(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "3")
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
MessageRouter: func(msg *ProducerMessage, topicMetadata TopicMetadata) int {
// The message key will contain the partition id where to route
i, err := strconv.Atoi(msg.Key)
assert.NoError(t, err)
return i
},
PartitionsAutoDiscoveryInterval: 100 * time.Millisecond,
})
assert.Nil(t, err)
defer producer.Close()
// Increase number of partitions to 10
makeHTTPCall(t, http.MethodPost, testURL, "10")
// Wait for the producer/consumers to pick up the change
time.Sleep(1 * time.Second)
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
AutoDiscoveryPeriod: 100 * time.Millisecond,
})
assert.Nil(t, err)
defer consumer.Close()
// Publish messages ensuring that they will go to all the partitions
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Key: fmt.Sprintf("%d", i),
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
msgs := make([]string, 0)
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
msgs = append(msgs, string(msg.Payload()))
t.Logf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
assert.Equal(t, len(msgs), 10)
}
func TestConsumerNegativeReceiverQueueSize(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
ReceiverQueueSize: -1,
})
defer func() {
if consumer != nil {
consumer.Close()
}
}()
assert.Nil(t, err)
}
func TestProducerName(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
producerName := "test-producer-name"
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Name: producerName,
})
assert.Nil(t, err)
defer producer.Close()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
defer consumer.Close()
// publish 10 messages to topic
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, msg.ProducerName(), producerName)
consumer.Ack(msg)
}
}
type noopConsumerInterceptor struct{}
func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
// copyPropertyInterceptor copy all keys in message properties map and add a suffix
type copyPropertyInterceptor struct {
suffix string
}
func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
properties := message.Properties()
copy := make(map[string]string, len(properties))
for k, v := range properties {
copy[k+x.suffix] = v
}
for ck, v := range copy {
properties[ck] = v
}
}
func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}
func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}
type metricConsumerInterceptor struct {
ackn int32
nackn int32
}
func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {
atomic.AddInt32(&x.ackn, 1)
}
func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {
atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
}
func TestConsumerWithInterceptors(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
ctx := context.Background()
metric := &metricConsumerInterceptor{}
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
NackRedeliveryDelay: time.Second, // for testing nack
Interceptors: ConsumerInterceptors{
noopConsumerInterceptor{},
copyPropertyInterceptor{suffix: "-copy"},
metric,
},
})
assert.Nil(t, err)
defer consumer.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
}); err != nil {
log.Fatal(err)
}
}
var nackIds []MessageID
// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
"key-1-copy": "pulsar-1", // check properties copy by interceptor
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
if i%2 == 0 {
consumer.Ack(msg)
} else {
nackIds = append(nackIds, msg.ID())
}
}
assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn))
for i := range nackIds {
consumer.NackID(nackIds[i])
}
// receive 5 nack messages
for i := 0; i < 5; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
expectMsg := fmt.Sprintf("hello-%d", i*2+1)
expectProperties := map[string]string{
"key-1": "pulsar-1",
"key-1-copy": "pulsar-1", // check properties copy by interceptor
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
consumer.Ack(msg)
}
assert.Equal(t, int32(5), atomic.LoadInt32(&metric.nackn))
}
func TestConsumerName(t *testing.T) {
assert := assert.New(t)
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(err)
defer client.Close()
topic := newTopicName()
// create consumer
consumerName := "test-consumer-name"
consumer, err := client.Subscribe(ConsumerOptions{
Name: consumerName,
Topic: topic,
SubscriptionName: "my-sub",
})
assert.Nil(err)
defer consumer.Close()
assert.Equal(consumerName, consumer.Name())
}
func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
const MsgBatchCount = 100
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/test-key-based-batch-with-key-shared"
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer1.Close()
consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer2.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
BatcherBuilderType: KeyBasedBatchBuilder,
BatchingMaxMessages: 10,
})
assert.Nil(t, err)
defer producer.Close()
ctx := context.Background()
keys := []string{"key1", "key2", "key3"}
for i := 0; i < MsgBatchCount; i++ {
for _, k := range keys {
producer.SendAsync(ctx, &ProducerMessage{
Key: k,
Payload: []byte(fmt.Sprintf("value-%d", i)),
}, func(id MessageID, producerMessage *ProducerMessage, err error) {
assert.Nil(t, err)
},
)
}
}
receivedConsumer1 := 0
receivedConsumer2 := 0
consumer1Keys := make(map[string]int)
consumer2Keys := make(map[string]int)
for (receivedConsumer1 + receivedConsumer2) < 300 {
select {
case cm, ok := <-consumer1.Chan():
if !ok {
break
}
receivedConsumer1++
cnt := 0
if _, has := consumer1Keys[cm.Key()]; has {
cnt = consumer1Keys[cm.Key()]
}
assert.Equal(
t, fmt.Sprintf("value-%d", cnt),
string(cm.Payload()),
)
consumer1Keys[cm.Key()] = cnt + 1
consumer1.Ack(cm.Message)
case cm, ok := <-consumer2.Chan():
if !ok {
break
}
receivedConsumer2++
cnt := 0
if _, has := consumer2Keys[cm.Key()]; has {
cnt = consumer2Keys[cm.Key()]
}
assert.Equal(
t, fmt.Sprintf("value-%d", cnt),
string(cm.Payload()),
)
consumer2Keys[cm.Key()] = cnt + 1
consumer2.Ack(cm.Message)
}
}
assert.NotEqual(t, 0, receivedConsumer1)
assert.NotEqual(t, 0, receivedConsumer2)
assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
receivedConsumer1, receivedConsumer2)
assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumser2: %v\n",
consumer1Keys, consumer2Keys)
}
func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
const MsgBatchCount = 10
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/test-ordering-of-key-based-batch-with-key-shared"
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer1.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
BatcherBuilderType: KeyBasedBatchBuilder,
BatchingMaxMessages: 30,
BatchingMaxPublishDelay: time.Second * 5,
})
assert.Nil(t, err)
defer producer.Close()
ctx := context.Background()
keys := []string{"key1", "key2", "key3"}
for i := 0; i < MsgBatchCount; i++ {
for _, k := range keys {
producer.SendAsync(ctx, &ProducerMessage{
Key: k,
Payload: []byte(fmt.Sprintf("value-%d", i)),
}, func(id MessageID, producerMessage *ProducerMessage, err error) {
assert.Nil(t, err)
},
)
}
}
var receivedKey string
var receivedMessageIndex int
for i := 0; i < len(keys)*MsgBatchCount; i++ {
cm, ok := <-consumer1.Chan()
if !ok {
break
}
if receivedKey != cm.Key() {
receivedKey = cm.Key()
receivedMessageIndex = 0
}
assert.Equal(
t, fmt.Sprintf("value-%d", receivedMessageIndex%10),
string(cm.Payload()),
)
consumer1.Ack(cm.Message)
receivedMessageIndex++
}
// Test OrderingKey
for i := 0; i < MsgBatchCount; i++ {
for _, k := range keys {
u := uuid.New()
producer.SendAsync(ctx, &ProducerMessage{
Key: u.String(),
OrderingKey: k,
Payload: []byte(fmt.Sprintf("value-%d", i)),
}, func(id MessageID, producerMessage *ProducerMessage, err error) {
assert.Nil(t, err)
},
)
}
}
receivedKey = ""
receivedMessageIndex = 0
for i := 0; i < len(keys)*MsgBatchCount; i++ {
cm, ok := <-consumer1.Chan()
if !ok {
break
}
if receivedKey != cm.OrderingKey() {
receivedKey = cm.OrderingKey()
receivedMessageIndex = 0
}
assert.Equal(
t, fmt.Sprintf("value-%d", receivedMessageIndex%10),
string(cm.Payload()),
)
consumer1.Ack(cm.Message)
receivedMessageIndex++
}
}
func TestConsumerKeySharedWithOrderingKey(t *testing.T) {
client, err := NewClient(
ClientOptions{
URL: lookupURL,
},
)
assert.Nil(t, err)
defer client.Close()
topic := "persistent://public/default/test-key-shared-with-ordering-key"
consumer1, err := client.Subscribe(
ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
},
)
assert.Nil(t, err)
defer consumer1.Close()
consumer2, err := client.Subscribe(
ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
},
)
assert.Nil(t, err)
defer consumer2.Close()
// create producer
producer, err := client.CreateProducer(
ProducerOptions{
Topic: topic,
DisableBatching: true,
},
)
assert.Nil(t, err)
defer producer.Close()
ctx := context.Background()
for i := 0; i < 100; i++ {
u := uuid.New()
_, err := producer.Send(
ctx, &ProducerMessage{
Key: u.String(),
OrderingKey: fmt.Sprintf("key-shared-%d", i%3),
Payload: []byte(fmt.Sprintf("value-%d", i)),
},
)
assert.Nil(t, err)
}
receivedConsumer1 := 0
receivedConsumer2 := 0
for (receivedConsumer1 + receivedConsumer2) < 100 {
select {
case cm, ok := <-consumer1.Chan():
if !ok {
break
}
receivedConsumer1++
consumer1.Ack(cm.Message)
case cm, ok := <-consumer2.Chan():
if !ok {
break
}
receivedConsumer2++
consumer2.Ack(cm.Message)
}
}
assert.NotEqual(t, 0, receivedConsumer1)
assert.NotEqual(t, 0, receivedConsumer2)
t.Logf(
"TestConsumerKeySharedWithOrderingKey received messages consumer1: %d consumser2: %d\n",
receivedConsumer1, receivedConsumer2,
)
assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
}
func TestProducerConsumerRSAEncryption(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
cryptoConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Decryption: &MessageDecryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail,
},
SubscriptionName: "crypto-subscription",
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
normalConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "normal-subscription",
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
cryptoProducer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
msgFormat := "my-message-%v"
totalMessages := 10
ctx := context.Background()
for i := 0; i < totalMessages; i++ {
_, err := cryptoProducer.Send(ctx, &ProducerMessage{
Value: fmt.Sprintf(msgFormat, i),
})
assert.Nil(t, err)
}
// try to consume with normal consumer
normalConsumerCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
msg, err := normalConsumer.Receive(normalConsumerCtx)
// msg should be null as the consumer will not be able to decrypt
assert.NotNil(t, err)
assert.Nil(t, msg)
// try to consume the message by crypto consumer
// consumer should be able to read all the messages
var actualMessage *string
for i := 0; i < totalMessages; i++ {
msg, err := cryptoConsumer.Receive(ctx)
fmt.Println(msg)
assert.Nil(t, err)
expectedMsg := fmt.Sprintf(msgFormat, i)
err = msg.GetSchemaValue(&actualMessage)
assert.Nil(t, err)
assert.Equal(t, expectedMsg, *actualMessage)
cryptoConsumer.Ack(msg)
}
}
func TestProducerConsumerRSAEncryptionWithCompression(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
cryptoConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Decryption: &MessageDecryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
},
SubscriptionName: "crypto-subscription",
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
normalConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "normal-subscription",
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
cryptoProducer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
Schema: NewStringSchema(nil),
CompressionType: LZ4,
})
assert.Nil(t, err)
msgFormat := "my-message-%v"
totalMessages := 10
ctx := context.Background()
for i := 0; i < totalMessages; i++ {
_, err := cryptoProducer.Send(ctx, &ProducerMessage{
Value: fmt.Sprintf(msgFormat, i),
})
assert.Nil(t, err)
}
// try to consume with normal consumer
normalConsumerCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
msg, err := normalConsumer.Receive(normalConsumerCtx)
// msg should be null as the consumer will not be able to decrypt
assert.NotNil(t, err)
assert.Nil(t, msg)
// try to consume the message by crypto consumer
// consumer should be able to read all the messages
var actualMessage *string
for i := 0; i < totalMessages; i++ {
msg, err := cryptoConsumer.Receive(ctx)
assert.Nil(t, err)
expectedMsg := fmt.Sprintf(msgFormat, i)
err = msg.GetSchemaValue(&actualMessage)
assert.Nil(t, err)
assert.Equal(t, expectedMsg, *actualMessage)
cryptoConsumer.Ack(msg)
}
}
func TestBatchProducerConsumerRSAEncryptionWithCompression(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
cryptoConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Decryption: &MessageDecryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
},
SubscriptionName: "crypto-subscription",
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
normalConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "normal-subscription",
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
batchSize := 2
cryptoProducer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
Schema: NewStringSchema(nil),
CompressionType: LZ4,
DisableBatching: false,
BatchingMaxMessages: uint(batchSize),
})
assert.Nil(t, err)
msgFormat := "my-message-%v"
totalMessages := 10
ctx := context.Background()
for i := 0; i < totalMessages; i++ {
_, err := cryptoProducer.Send(ctx, &ProducerMessage{
Value: fmt.Sprintf(msgFormat, i),
})
assert.Nil(t, err)
}
// try to consume with normal consumer
normalConsumerCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
msg, err := normalConsumer.Receive(normalConsumerCtx)
// msg should be null as the consumer will not be able to decrypt
assert.NotNil(t, err)
assert.Nil(t, msg)
// try to consume the message by crypto consumer
// consumer should be able to read all the messages
var actualMessage *string
for i := 0; i < totalMessages; i++ {
msg, err := cryptoConsumer.Receive(ctx)
assert.Nil(t, err)
expectedMsg := fmt.Sprintf(msgFormat, i)
err = msg.GetSchemaValue(&actualMessage)
assert.Nil(t, err)
assert.Equal(t, expectedMsg, *actualMessage)
cryptoConsumer.Ack(msg)
}
}
func TestProducerConsumerRedeliveryOfFailedEncryptedMessages(t *testing.T) {
// create new client instance for each producer and consumer
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
defer client.Close()
clientCryptoConsumer, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
defer clientCryptoConsumer.Close()
clientCryptoConsumerInvalidKeyReader, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
defer clientCryptoConsumerInvalidKeyReader.Close()
clientcryptoConsumerNoKeyReader, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
defer clientcryptoConsumerNoKeyReader.Close()
topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
cryptoProducer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
CompressionType: LZ4,
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
sharedSubscription := "crypto-shared-subscription"
cryptoConsumer, err := clientCryptoConsumer.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sharedSubscription,
Decryption: &MessageDecryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
},
Schema: NewStringSchema(nil),
Type: Shared,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
cryptoConsumerInvalidKeyReader, err := clientCryptoConsumerInvalidKeyReader.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sharedSubscription,
Decryption: &MessageDecryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa_invalid.pem"),
},
Schema: NewStringSchema(nil),
Type: Shared,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
cryptoConsumerNoKeyReader, err := clientcryptoConsumerNoKeyReader.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sharedSubscription,
Schema: NewStringSchema(nil),
Type: Shared,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
totalMessages := 5
message := "my-message-%v"
// since messages can be in random order
// map can be used to check if all the messages are received
messageMap := map[string]struct{}{}
// producer messages
for i := 0; i < totalMessages; i++ {
mid, err := cryptoProducer.Send(context.Background(), &ProducerMessage{
Value: fmt.Sprintf(message, i),
})
assert.Nil(t, err)
t.Logf("Sent : %v\n", mid)
}
// Consuming from consumer 2 and 3
// no message should be returned since they can't decrypt the message
ctxWithTimeOut1, c1 := context.WithTimeout(context.Background(), 2*time.Second)
defer c1()
ctxWithTimeOut2, c2 := context.WithTimeout(context.Background(), 2*time.Second)
defer c2()
// try to consume messages
msg, err := cryptoConsumerInvalidKeyReader.Receive(ctxWithTimeOut1)
assert.NotNil(t, err)
assert.Nil(t, msg)
msg, err = cryptoConsumerNoKeyReader.Receive(ctxWithTimeOut2)
assert.NotNil(t, err)
assert.Nil(t, msg)
cryptoConsumerInvalidKeyReader.Close()
cryptoConsumerNoKeyReader.Close()
// try to consume by consumer1
// all the messages would by received by it
var receivedMsg *string
for i := 0; i < totalMessages; i++ {
m, err := cryptoConsumer.Receive(context.Background())
assert.Nil(t, err)
err = m.GetSchemaValue(&receivedMsg)
assert.Nil(t, err)
messageMap[*receivedMsg] = struct{}{}
cryptoConsumer.Ack(m)
t.Logf("Received : %v\n", m.ID())
}
// check if all messages were received
for i := 0; i < totalMessages; i++ {
key := fmt.Sprintf(message, i)
_, ok := messageMap[key]
assert.True(t, ok)
}
}
func TestRSAEncryptionFailure(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
client.Close()
topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
// 1. invalid key name
// create producer with invalid key
// producer creation succeeds but message sending should fail with an error
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa_invalid.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
Schema: NewStringSchema(nil),
DisableBatching: true,
})
assert.Nil(t, err)
assert.NotNil(t, producer)
// sending of message should fail with an error, since invalid rsa keys are configured
mid, err := producer.Send(context.Background(), &ProducerMessage{
Value: "some-message",
})
assert.Nil(t, mid)
assert.NotNil(t, err)
producer.Close()
// 2. Producer with valid key name
producer, err = client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
Schema: NewStringSchema(nil),
DisableBatching: true,
})
assert.Nil(t, err)
assert.NotNil(t, producer)
subscriptionName := "enc-failure-subcription"
totalMessages := 10
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
})
assert.Nil(t, err)
messageFormat := "my-message-%v"
for i := 0; i < totalMessages; i++ {
_, err := producer.Send(context.Background(), &ProducerMessage{
Value: fmt.Sprintf(messageFormat, i),
})
assert.Nil(t, err)
}
// 3. KeyReader is not set by the consumer
// Receive should fail since KeyReader is not setup
// because default behaviour of consumer is fail receiving message if error in decryption
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
msg, err := consumer.Receive(ctx)
assert.NotNil(t, err)
assert.Nil(t, msg, "Receive should have failed with no keyreader")
// 4. Set consumer config to consume even if decryption fails
consumer.Close()
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
Decryption: &MessageDecryptionInfo{
ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionConsume,
},
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
assert.NotNil(t, consumer)
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel2()
for i := 0; i < totalMessages-1; i++ {
expectedMessage := fmt.Sprintf(messageFormat, i)
msg, err = consumer.Receive(ctx2)
assert.Nil(t, err)
assert.NotNil(t, msg)
receivedMsg := string(msg.Payload())
assert.NotEqual(t, expectedMessage, receivedMsg, fmt.Sprintf(`Received encrypted message [%v]
should not match the expected message [%v]`, expectedMessage, receivedMsg))
// verify the message contains Encryption context
assert.NotEmpty(t, msg.GetEncryptionContext(),
"Encrypted message which is failed to decrypt must contain EncryptionContext")
consumer.Ack(msg)
}
// 5. discard action on decryption failure
consumer.Close()
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
Decryption: &MessageDecryptionInfo{
ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionDiscard,
},
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
assert.NotNil(t, consumer)
ctx3, cancel3 := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel3()
msg, err = consumer.Receive(ctx3)
assert.NotNil(t, err)
assert.Nil(t, msg, "Message received even aftet ConsumerCryptoFailureAction.Discard is set.")
}
func TestConsumerCompressionWithRSAEncryption(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
CompressionType: LZ4,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"enc-compress-app.key"},
},
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Decryption: &MessageDecryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
},
})
assert.Nil(t, err)
defer consumer.Close()
const N = 100
for i := 0; i < N; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}
}
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}
}
func TestBatchMessageReceiveWithCompressionAndRSAEcnryption(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := "persistent://public/default/receive-batch-comp-enc"
subName := "subscription-name"
prefix := "msg-batch-"
ctx := context.Background()
// Enable batching on producer side
batchSize, numOfMessages := 2, 100
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
BatchingMaxMessages: uint(batchSize),
DisableBatching: false,
CompressionType: LZ4,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"batch-encryption-app.key"},
},
})
assert.Nil(t, err)
assert.Equal(t, topicName, producer.Topic())
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
Decryption: &MessageDecryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
},
})
assert.Nil(t, err)
defer consumer.Close()
count := 0
for i := 0; i < numOfMessages; i++ {
messageContent := prefix + fmt.Sprintf("%d", i)
msg := &ProducerMessage{
Payload: []byte(messageContent),
}
_, err := producer.Send(ctx, msg)
assert.Nil(t, err)
}
for i := 0; i < numOfMessages; i++ {
msg, err := consumer.Receive(ctx)
t.Logf("received : %v\n", string(msg.Payload()))
assert.Nil(t, err)
consumer.Ack(msg)
count++
}
assert.Equal(t, count, numOfMessages)
}
type EncKeyReader struct {
publicKeyPath string
privateKeyPath string
metaMap map[string]string
}
func NewEncKeyReader(publicKeyPath, privateKeyPath string) *EncKeyReader {
metaMap := map[string]string{
"version": "1.0",
}
return &EncKeyReader{
publicKeyPath: publicKeyPath,
privateKeyPath: privateKeyPath,
metaMap: metaMap,
}
}
// GetPublicKey read public key from the given path
func (d *EncKeyReader) PublicKey(keyName string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) {
return readKey(keyName, d.publicKeyPath, d.metaMap)
}
// GetPrivateKey read private key from the given path
func (d *EncKeyReader) PrivateKey(keyName string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) {
return readKey(keyName, d.privateKeyPath, d.metaMap)
}
func readKey(keyName, path string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) {
key, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return crypto.NewEncryptionKeyInfo(keyName, key, keyMeta), nil
}
func TestConsumerEncryptionWithoutKeyReader(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
encryptionKeyName := "client-rsa.pem"
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{encryptionKeyName},
},
CompressionType: LZ4,
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
assert.NotNil(t, producer)
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-subscription-name",
Decryption: &MessageDecryptionInfo{
ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionConsume,
},
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
message := "my-message"
_, err = producer.Send(context.Background(), &ProducerMessage{
Value: message,
})
assert.Nil(t, err)
// consume encrypted message
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.NotNil(t, msg)
// try to decrypt message
encCtx := msg.GetEncryptionContext()
assert.NotEmpty(t, encCtx)
keys := encCtx.Keys
assert.Equal(t, 1, len(keys))
encryptionKey, ok := keys[encryptionKeyName]
assert.True(t, ok)
encDataKey := encryptionKey.KeyValue
assert.NotNil(t, encDataKey)
metadata := encryptionKey.Metadata
assert.NotNil(t, metadata)
version := metadata["version"]
assert.Equal(t, "1.0", version)
compressionType := encCtx.CompressionType
uncompressedSize := uint32(encCtx.UncompressedSize)
encParam := encCtx.Param
encAlgo := encCtx.Algorithm
batchSize := encCtx.BatchSize
// try to decrypt using default MessageCrypto
msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", false, plog.DefaultNopLogger())
assert.Nil(t, err)
producerName := "test"
sequenceID := uint64(123)
publishTime := uint64(12333453454)
messageMetaData := pb.MessageMetadata{
EncryptionParam: encParam,
ProducerName: &producerName,
SequenceId: &sequenceID,
PublishTime: &publishTime,
UncompressedSize: &uncompressedSize,
EncryptionAlgo: &encAlgo,
}
if compressionType == LZ4 {
messageMetaData.Compression = pb.CompressionType_LZ4.Enum()
}
messageMetaData.EncryptionKeys = []*pb.EncryptionKeys{{
Key: &encryptionKeyName,
Value: encDataKey,
}}
decryptedPayload, err := msgCrypto.Decrypt(crypto.NewMessageMetadataSupplier(&messageMetaData),
msg.Payload(),
NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"))
assert.Nil(t, err)
assert.NotNil(t, decryptedPayload)
// try to uncompress payload
uncompressedPayload := make([]byte, uncompressedSize)
s, err := lz4.UncompressBlock(decryptedPayload, uncompressedPayload)
assert.Nil(t, err)
assert.Equal(t, uncompressedSize, uint32(s))
buffer := internal.NewBufferWrapper(uncompressedPayload)
if batchSize > 0 {
size := buffer.ReadUint32()
var meta pb.SingleMessageMetadata
if err := proto.Unmarshal(buffer.Read(size), &meta); err != nil {
fmt.Println(err)
}
d := buffer.Read(uint32(meta.GetPayloadSize()))
assert.Equal(t, message, string(d))
}
}
// TestEncryptDecryptRedeliveryOnFailure test redelivery failed messages
func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
topic := newTopicName()
subcription := "test-subscription-redelivery"
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subcription,
Decryption: &MessageDecryptionInfo{
KeyReader: NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_invalid_rsa.pem"),
},
})
assert.Nil(t, err)
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"new-enc-key"},
},
})
assert.Nil(t, err)
producer.Send(context.Background(), &ProducerMessage{
Payload: []byte("new-test-message"),
})
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
// message receive should fail due to decryption error
msg, err := consumer.Receive(ctx)
assert.Nil(t, msg)
assert.NotNil(t, err)
consumer.Close()
// create consumer with same subscription and proper rsa key pairs
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subcription,
Decryption: &MessageDecryptionInfo{
KeyReader: NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
},
})
assert.Nil(t, err)
// previous message should be redelivered
msg, err = consumer.Receive(context.Background())
assert.Nil(t, err)
assert.NotNil(t, msg)
consumer.Ack(msg)
}
// TestConsumerSeekByTimeOnPartitionedTopic test seek by time on partitioned topic.
// It is based on existing test case [TestConsumerSeekByTime] but for partitioned topic.
func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
// Create topic with 5 partitions
topicAdminURL := "admin/v2/persistent/public/default/TestSeekByTimeOnPartitionedTopic/partitions"
err = httpPut(topicAdminURL, 5)
defer httpDelete(topicAdminURL)
assert.Nil(t, err)
topicName := "persistent://public/default/TestSeekByTimeOnPartitionedTopic"
partitions, err := client.TopicPartitions(topicName)
assert.Nil(t, err)
assert.Equal(t, len(partitions), 5)
for i := 0; i < 5; i++ {
assert.Equal(t, partitions[i],
fmt.Sprintf("%s-partition-%d", topicName, i))
}
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
defer consumer.Close()
// Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
const N = 1100
resetTimeStr := "100s"
retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
assert.Nil(t, err)
for i := 0; i < N; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}
// Don't consume all messages so some stay in queues
for i := 0; i < N-20; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
consumer.Ack(msg)
}
currentTimestamp := time.Now()
err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
assert.Nil(t, err)
// should be able to consume all messages once again
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
consumer.Ack(msg)
}
}
func TestAvailablePermitsLeak(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
client.Close()
topic := fmt.Sprintf("my-topic-test-ap-leak-%v", time.Now().Nanosecond())
// 1. Producer with valid key name
p1, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
Schema: NewStringSchema(nil),
DisableBatching: true,
})
assert.Nil(t, err)
assert.NotNil(t, p1)
subscriptionName := "enc-failure-subcription"
totalMessages := 1000
// 2. KeyReader is not set by the consumer
// Receive should fail since KeyReader is not setup
// because default behaviour of consumer is fail receiving message if error in decryption
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
})
assert.Nil(t, err)
messageFormat := "my-message-%v"
for i := 0; i < totalMessages; i++ {
_, err := p1.Send(context.Background(), &ProducerMessage{
Value: fmt.Sprintf(messageFormat, i),
})
assert.Nil(t, err)
}
// 2. Set another producer that send message without crypto.
// The consumer can receive it correct.
p2, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Schema: NewStringSchema(nil),
DisableBatching: true,
})
assert.Nil(t, err)
assert.NotNil(t, p2)
_, err = p2.Send(context.Background(), &ProducerMessage{
Value: fmt.Sprintf(messageFormat, totalMessages),
})
assert.Nil(t, err)
// 3. Discard action on decryption failure. Create a availablePermits leak scenario
consumer.Close()
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
Decryption: &MessageDecryptionInfo{
ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionDiscard,
},
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
assert.NotNil(t, consumer)
// 4. If availablePermits does not leak, consumer can get the last message which is no crypto.
// The ctx3 will not exceed deadline.
ctx3, cancel3 := context.WithTimeout(context.Background(), 15*time.Second)
_, err = consumer.Receive(ctx3)
cancel3()
assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded),
"This means the resource is exhausted. consumer.Receive() will block forever.")
}
func TestConsumerWithBackoffPolicy(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()
topicName := newTopicName()
backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
_consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
BackoffPolicy: backoff,
})
assert.Nil(t, err)
defer _consumer.Close()
partitionConsumerImp := _consumer.(*consumer).consumers[0]
// 1 s
startTime := time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
// 2 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
}
func TestAckWithMessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()
// send messages
if _, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
}); err != nil {
log.Fatal(err)
}
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
id := message.ID()
newID := NewMessageID(id.LedgerID(), id.EntryID(), id.BatchIdx(), id.PartitionIdx())
err = consumer.AckID(newID)
assert.Nil(t, err)
}
func TestBatchIndexAck(t *testing.T) {
type config struct {
ackWithResponse bool
cumulative bool
ackGroupingOptions *AckGroupingOptions
}
configs := make([]config, 0)
for _, option := range []*AckGroupingOptions{
nil, // MaxSize: 1000, MaxTime: 10ms
{MaxSize: 0, MaxTime: 0},
{MaxSize: 1000, MaxTime: 0},
} {
configs = append(configs, config{true, true, option})
configs = append(configs, config{true, false, option})
configs = append(configs, config{false, true, option})
configs = append(configs, config{false, false, option})
}
for _, params := range configs {
option := params.ackGroupingOptions
if option == nil {
option = &AckGroupingOptions{1000, 10 * time.Millisecond}
}
t.Run(fmt.Sprintf("TestBatchIndexAck_WithResponse_%v_Cumulative_%v_AckGroupingOption_%v_%v",
params.ackWithResponse, params.cumulative, option.MaxSize, option.MaxTime.Milliseconds()),
func(t *testing.T) {
runBatchIndexAckTest(t, params.ackWithResponse, params.cumulative, params.ackGroupingOptions)
})
}
}
func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, option *AckGroupingOptions) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
topic := newTopicName()
createConsumer := func() Consumer {
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
AckWithResponse: ackWithResponse,
EnableBatchIndexAcknowledgment: true,
AckGroupingOptions: option,
})
assert.Nil(t, err)
return consumer
}
consumer := createConsumer()
duration, err := time.ParseDuration("1h")
assert.Nil(t, err)
const BatchingMaxSize int = 2 * 5
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
BatchingMaxMessages: uint(BatchingMaxSize),
BatchingMaxSize: uint(1024 * 1024 * 10),
BatchingMaxPublishDelay: duration,
})
assert.Nil(t, err)
for i := 0; i < BatchingMaxSize; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-%d", i)),
}, func(id MessageID, producerMessage *ProducerMessage, err error) {
assert.Nil(t, err)
log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize())
})
}
assert.Nil(t, producer.FlushWithCtx(context.Background()))
msgIds := make([]MessageID, BatchingMaxSize)
for i := 0; i < BatchingMaxSize; i++ {
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
msgIds[i] = message.ID()
log.Printf("Received %v from %v:%d:%d", string(message.Payload()), message.ID(),
message.ID().BatchIdx(), message.ID().BatchSize())
}
// Acknowledge half of the messages
if cumulative {
msgID := msgIds[BatchingMaxSize/2-1]
consumer.AckIDCumulative(msgID)
log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx())
} else {
for i := 0; i < BatchingMaxSize; i++ {
msgID := msgIds[i]
if i%2 == 0 {
consumer.AckID(msgID)
log.Printf("Acknowledge %v:%d\n", msgID, msgID.BatchIdx())
}
}
}
consumer.Close()
consumer = createConsumer()
for i := 0; i < BatchingMaxSize/2; i++ {
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
log.Printf("Received %v from %v:%d:%d", string(message.Payload()), message.ID(),
message.ID().BatchIdx(), message.ID().BatchSize())
index := i*2 + 1
if cumulative {
index = i + BatchingMaxSize/2
}
assert.Equal(t, []byte(fmt.Sprintf("msg-%d", index)), message.Payload())
assert.Equal(t, msgIds[index].BatchIdx(), message.ID().BatchIdx())
// We should not acknowledge message.ID() here because message.ID() shares a different
// tracker with msgIds
if !cumulative {
msgID := msgIds[index]
consumer.AckID(msgID)
log.Printf("Acknowledge %v:%d\n", msgID, msgID.BatchIdx())
}
}
if cumulative {
msgID := msgIds[BatchingMaxSize-1]
consumer.AckIDCumulative(msgID)
log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx())
}
consumer.Close()
consumer = createConsumer()
_, err = producer.Send(context.Background(), &ProducerMessage{Payload: []byte("end-marker")})
assert.Nil(t, err)
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.Equal(t, "end-marker", string(msg.Payload()))
client.Close()
}
func TestConsumerWithAutoScaledQueueReceive(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
// create consumer
c, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
ReceiverQueueSize: 3,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
pc := c.(*consumer).consumers[0]
assert.Equal(t, int32(1), pc.currentQueueSize.Load())
defer c.Close()
// create p
p, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer p.Close()
// send message, it will update scaleReceiverQueueHint from false to true
_, err = p.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
})
assert.NoError(t, err)
// this will trigger receiver queue size expanding to 2 because we have prefetched 1 message >= currentSize 1.
_, err = c.Receive(context.Background())
assert.Nil(t, err)
// currentQueueSize should be doubled in size
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 2, int(pc.currentQueueSize.Load()))
})
for i := 0; i < 5; i++ {
_, err = p.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
})
assert.NoError(t, err)
// waiting for prefetched message passing from queueCh to messageCh
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 1, len(pc.messageCh))
})
_, err = p.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
})
assert.NoError(t, err)
// wait all the messages has been prefetched
_, err = c.Receive(context.Background())
assert.Nil(t, err)
_, err = c.Receive(context.Background())
assert.Nil(t, err)
// this will not trigger receiver queue size expanding because we have prefetched 2 message < currentSize 4.
assert.Equal(t, int32(2), pc.currentQueueSize.Load())
}
for i := 0; i < 5; i++ {
p.SendAsync(
context.Background(),
&ProducerMessage{Payload: []byte("hello")},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}
retryAssert(t, 3, 300, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 3, int(pc.currentQueueSize.Load()))
})
}
func TestConsumerNonDurable(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
SubscriptionMode: NonDurable,
})
assert.Nil(t, err)
i := 1
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
consumer.Ack(msg)
consumer.Close()
i++
// send a message. Pulsar should delete it as there is no active subscription
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}
i++
// Subscribe again
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
SubscriptionMode: NonDurable,
})
assert.Nil(t, err)
defer consumer.Close()
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
}
msg, err = consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}
func TestConsumerBatchIndexAckDisabled(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
for i := 0; i < 5; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-%d", i)),
}, nil)
}
for i := 0; i < 5; i++ {
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
consumer.Ack(message)
}
consumer.Close()
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
producer.Send(context.Background(), &ProducerMessage{Payload: []byte("done")})
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.Equal(t, []byte("done"), message.Payload())
}
func TestConsumerMemoryLimit(t *testing.T) {
// Create client 1 without memory limit
cli1, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer cli1.Close()
// Create client 1 with memory limit
cli2, err := NewClient(ClientOptions{
URL: lookupURL,
MemoryLimitBytes: 10 * 1024,
})
assert.Nil(t, err)
defer cli2.Close()
topic := newTopicName()
// Use client 1 to create producer p1
p1, err := cli1.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer p1.Close()
// Use mem-limited client 2 to create consumer c1
c1, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-1",
Type: Exclusive,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c1.Close()
pc1 := c1.(*consumer).consumers[0]
// Fill up the messageCh of c1
for i := 0; i < 10; i++ {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc1.messageCh))
})
// Get current receiver queue size of c1
prevQueueSize := pc1.currentQueueSize.Load()
// Make the client 1 exceed the memory limit
_, err = p1.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(10*1024 + 1),
})
assert.NoError(t, err)
// c1 should shrink it's receiver queue size
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, prevQueueSize/2, pc1.currentQueueSize.Load())
})
// Use mem-limited client 2 to create consumer c2
c2, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-2",
Type: Exclusive,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c2.Close()
pc2 := c2.(*consumer).consumers[0]
// Try to induce c2 receiver queue size expansion
for i := 0; i < 10; i++ {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc1.messageCh))
})
// c2 receiver queue size should not expansion because client 1 has exceeded the memory limit
assert.Equal(t, 1, int(pc2.currentQueueSize.Load()))
// Use mem-limited client 2 to create producer p2
p2, err := cli2.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
DisableBlockIfQueueFull: true,
})
assert.Nil(t, err)
defer p2.Close()
_, err = p2.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(1),
})
// Producer can't send message
assert.Equal(t, true, errors.Is(err, ErrMemoryBufferIsFull))
}
func TestMultiConsumerMemoryLimit(t *testing.T) {
// Create client 1 without memory limit
cli1, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer cli1.Close()
// Create client 1 with memory limit
cli2, err := NewClient(ClientOptions{
URL: lookupURL,
MemoryLimitBytes: 10 * 1024,
})
assert.Nil(t, err)
defer cli2.Close()
topic := newTopicName()
// Use client 1 to create producer p1
p1, err := cli1.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer p1.Close()
// Use mem-limited client 2 to create consumer c1
c1, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-1",
Type: Exclusive,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c1.Close()
pc1 := c1.(*consumer).consumers[0]
// Use mem-limited client 2 to create consumer c1
c2, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-2",
Type: Exclusive,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c2.Close()
pc2 := c2.(*consumer).consumers[0]
// Fill up the messageCh of c1 nad c2
for i := 0; i < 10; i++ {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc1.messageCh))
})
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc2.messageCh))
})
// Get current receiver queue size of c1 and c2
pc1PrevQueueSize := pc1.currentQueueSize.Load()
pc2PrevQueueSize := pc2.currentQueueSize.Load()
// Make the client 1 exceed the memory limit
_, err = p1.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(10*1024 + 1),
})
assert.NoError(t, err)
// c1 should shrink it's receiver queue size
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, pc1PrevQueueSize/2, pc1.currentQueueSize.Load())
})
// c2 should shrink it's receiver queue size too
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, pc2PrevQueueSize/2, pc2.currentQueueSize.Load())
})
}