blob: ee3ab17760f2bdc81c2308b5ddd7241475bf5319 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
var _brokerMaxMessageSize = 1024 * 1024
func TestInvalidChunkingConfig(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
assert.Nil(t, err)
defer client.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
DisableBatching: false,
EnableChunking: true,
assert.Error(t, err, "producer creation should have fail")
assert.Nil(t, producer)
func TestLargeMessage(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
// create producer without ChunkMaxMessageSize
producer1, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
EnableChunking: true,
assert.NoError(t, err)
assert.NotNil(t, producer1)
defer producer1.Close()
// create producer with ChunkMaxMessageSize
producer2, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
EnableChunking: true,
ChunkMaxMessageSize: 5,
assert.NoError(t, err)
assert.NotNil(t, producer2)
defer producer2.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: "chunk-subscriber",
assert.NoError(t, err)
assert.NotNil(t, consumer)
defer consumer.Close()
expectMsgs := make([][]byte, 0, 10)
// test send chunk with serverMaxMessageSize limit
for i := 0; i < 5; i++ {
msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
expectMsgs = append(expectMsgs, msg)
ID, err := producer1.Send(context.Background(), &ProducerMessage{
Payload: msg,
assert.NoError(t, err)
assert.NotNil(t, ID)
// test receive chunk with serverMaxMessageSize limit
for i := 0; i < 5; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
msg, err := consumer.Receive(ctx)
assert.NoError(t, err)
expectMsg := expectMsgs[i]
assert.Equal(t, expectMsg, msg.Payload())
// ack message
err = consumer.Ack(msg)
assert.NoError(t, err)
// test send chunk with ChunkMaxMessageSize limit
for i := 0; i < 5; i++ {
msg := createTestMessagePayload(50)
expectMsgs = append(expectMsgs, msg)
ID, err := producer2.Send(context.Background(), &ProducerMessage{
Payload: msg,
assert.NoError(t, err)
assert.NotNil(t, ID)
// test receive chunk with ChunkMaxMessageSize limit
for i := 5; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
assert.NoError(t, err)
expectMsg := expectMsgs[i]
assert.Equal(t, expectMsg, msg.Payload())
// ack message
err = consumer.Ack(msg)
assert.NoError(t, err)
func TestMaxPendingChunkMessages(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
EnableChunking: true,
ChunkMaxMessageSize: 10,
assert.NoError(t, err)
assert.NotNil(t, producer)
c, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: "chunk-subscriber",
MaxPendingChunkedMessage: 1,
assert.NoError(t, err)
assert.NotNil(t, c)
defer c.Close()
pc := c.(*consumer).consumers[0]
sendSingleChunk(producer, "0", 0, 2)
// MaxPendingChunkedMessage is 1, the chunked message with uuid 0 will be discarded
sendSingleChunk(producer, "1", 0, 2)
// chunkedMsgCtx with uuid 0 should be discarded
retryAssert(t, 3, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 1, len(pc.chunkedMsgCtxMap.chunkedMsgCtxs))
sendSingleChunk(producer, "1", 1, 2)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
msg, err := c.Receive(ctx)
assert.NoError(t, err)
assert.Equal(t, "chunk-1-0|chunk-1-1|", string(msg.Payload()))
// Ensure that the chunked message of uuid 0 is discarded.
sendSingleChunk(producer, "0", 1, 2)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
msg, err = c.Receive(ctx)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
func TestExpireIncompleteChunks(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
c, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: "chunk-subscriber",
ExpireTimeOfIncompleteChunk: time.Millisecond * 300,
assert.NoError(t, err)
defer c.Close()
uuid := "test-uuid"
chunkCtxMap := c.(*consumer).consumers[0].chunkedMsgCtxMap
chunkCtxMap.addIfAbsent(uuid, 2, 100)
ctx := chunkCtxMap.get(uuid)
assert.NotNil(t, ctx)
time.Sleep(400 * time.Millisecond)
ctx = chunkCtxMap.get(uuid)
assert.Nil(t, ctx)
func TestChunksEnqueueFailed(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
EnableChunking: true,
DisableBatching: true,
MaxPendingMessages: 10,
ChunkMaxMessageSize: 50,
DisableBlockIfQueueFull: true,
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
// Reduce publish rate to prevent the producer sending messages too fast
url := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/publishRate"
makeHTTPCall(t, http.MethodPost, url, "{\"publishThrottlingRateInMsg\": 1,\"publishThrottlingRateInByte\": 1000}")
// Need to wait some time to let the rate limiter take effect
time.Sleep(2 * time.Second)
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(1000),
assert.Error(t, err)
assert.Nil(t, ID)
func TestSeekChunkMessages(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
totalMessages := 5
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
EnableChunking: true,
DisableBatching: true,
ChunkMaxMessageSize: 50,
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: "default-seek",
assert.NoError(t, err)
assert.NotNil(t, consumer)
defer consumer.Close()
msgIDs := make([]MessageID, 0)
for i := 0; i < totalMessages; i++ {
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(100),
assert.NoError(t, err)
msgIDs = append(msgIDs, ID)
for i := 0; i < totalMessages; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
msg, err := consumer.Receive(ctx)
assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
err = consumer.Seek(msgIDs[1])
assert.NoError(t, err)
for i := 1; i < totalMessages; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
msg, err := consumer.Receive(ctx)
assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
// todo: add reader seek test when support reader read chunk message
func TestChunkAckAndNAck(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
EnableChunking: true,
DisableBatching: true,
ChunkMaxMessageSize: 50,
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: "default-seek",
NackRedeliveryDelay: time.Second,
assert.NoError(t, err)
assert.NotNil(t, consumer)
defer consumer.Close()
content := createTestMessagePayload(100)
_, err = producer.Send(context.Background(), &ProducerMessage{
Payload: content,
assert.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
msg, err := consumer.Receive(ctx)
assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Equal(t, msg.Payload(), content)
time.Sleep(time.Second * 2)
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
msg, err = consumer.Receive(ctx)
assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Equal(t, msg.Payload(), content)
func TestChunkSize(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
assert.Nil(t, err)
defer client.Close()
// the default message metadata size for string schema
// The proto messageMetaData size as following. (all with tag) (maxMessageSize = 1024 * 1024)
// | producerName | sequenceID | publishTime | uncompressedSize |
// | ------------ | ---------- | ----------- | ---------------- |
// | 6 | 2 | 7 | 4 |
payloadChunkSize := _brokerMaxMessageSize - 19
topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Name: "test",
Topic: topic,
EnableChunking: true,
DisableBatching: true,
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
for size := payloadChunkSize; size <= _brokerMaxMessageSize; size++ {
msgID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(size),
assert.NoError(t, err)
if size <= payloadChunkSize {
_, ok := msgID.(*messageID)
assert.Equal(t, true, ok)
} else {
_, ok := msgID.(*chunkMessageID)
assert.Equal(t, true, ok)
func TestChunkMultiTopicConsumerReceive(t *testing.T) {
topic1 := newTopicName()
topic2 := newTopicName()
client, err := NewClient(ClientOptions{
URL: lookupURL,
if err != nil {
topics := []string{topic1, topic2}
consumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
if err != nil {
defer consumer.Close()
maxSize := 50
// produce messages
for i, topic := range topics {
p, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
EnableChunking: true,
ChunkMaxMessageSize: uint(maxSize),
if err != nil {
err = genMessages(p, 10, func(idx int) string {
return fmt.Sprintf("topic-%d-hello-%d-%s", i+1, idx, string(createTestMessagePayload(100)))
if err != nil {
receivedTopic1 := 0
receivedTopic2 := 0
// nolint
for receivedTopic1+receivedTopic2 < 20 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
select {
case cm, ok := <-consumer.Chan():
if ok {
msg := string(cm.Payload())
if strings.HasPrefix(msg, "topic-1") {
} else if strings.HasPrefix(msg, "topic-2") {
} else {
case <-ctx.Done():
assert.Equal(t, receivedTopic1, receivedTopic2)
func TestChunkBlockIfQueueFull(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
if err != nil {
topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Name: "test",
Topic: topic,
EnableChunking: true,
DisableBatching: true,
MaxPendingMessages: 1,
ChunkMaxMessageSize: 10,
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
// Large messages will be split into 11 chunks, exceeding the length of pending queue
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(100),
assert.NoError(t, err)
assert.NotNil(t, ID)
func createTestMessagePayload(size int) []byte {
payload := make([]byte, size)
for i := range payload {
payload[i] = byte(rand.Intn(100))
return payload
func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
msg := &ProducerMessage{
Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)),
producerImpl := p.(*producer).producers[0].(*partitionProducer)
mm := producerImpl.genMetadata(msg, len(msg.Payload), time.Now())
mm.Uuid = proto.String(uuid)
mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
mm.TotalChunkMsgSize = proto.Int32(int32(len(msg.Payload)))
mm.ChunkId = proto.Int32(int32(chunkID))
producerImpl.updateMetadataSeqID(mm, msg)
doneCh := make(chan struct{})
callback: func(id MessageID, producerMessage *ProducerMessage, err error) {
msg: msg,