blob: 08d2cf221ead67ad7558153b1482410b1e710499 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/stretchr/testify/assert"
)
func TestGetMessagesByID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
ctx := context.Background()
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
// create producer
numberMessages := 10
batchingMaxMessages := numberMessages / 2
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
BatchingMaxMessages: uint(batchingMaxMessages),
BatchingMaxPublishDelay: time.Second * 60,
})
assert.Nil(t, err)
defer producer.Close()
var wg sync.WaitGroup
wg.Add(numberMessages)
messageIDMap := make(map[string]int32)
for i := 0; i <= numberMessages; i++ {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}, func(id pulsar.MessageID, _ *pulsar.ProducerMessage, err error) {
assert.Nil(t, err)
messageIDMap[id.String()]++
wg.Done()
})
}
wg.Wait()
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
for id, i := range messageIDMap {
assert.Equal(t, i, int32(batchingMaxMessages))
messageID, err := utils.ParseMessageID(id)
assert.Nil(t, err)
messages, err := admin.Subscriptions().GetMessagesByID(*topicName, messageID.LedgerID, messageID.EntryID)
assert.Nil(t, err)
assert.Equal(t, batchingMaxMessages, len(messages))
}
_, err = admin.Subscriptions().GetMessagesByID(*topicName, 1024, 2048)
assert.Errorf(t, err, "Message id not found")
}
func TestPeekMessageForPartitionedTopic(t *testing.T) {
ctx := context.Background()
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
topicName, _ := utils.GetTopicName(topic)
subName := "test-sub"
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
err = admin.Topics().Create(*topicName, 2)
assert.NoError(t, err)
err = admin.Subscriptions().Create(*topicName, subName, utils.Earliest)
assert.NoError(t, err)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.NoError(t, err)
defer producer.Close()
for i := 0; i < 100; i++ {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}, nil)
}
err = producer.FlushWithCtx(ctx)
if err != nil {
return
}
for i := 0; i < 2; i++ {
topicWithPartition := fmt.Sprintf("%s-partition-%d", topic, i)
topicName, err := utils.GetTopicName(topicWithPartition)
assert.NoError(t, err)
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 10)
assert.NoError(t, err)
assert.NotNil(t, messages)
for _, msg := range messages {
assert.Equal(t, msg.GetMessageID().PartitionIndex, i)
}
}
}
func TestPeekMessagesWithProperties(t *testing.T) {
tests := map[string]struct {
batched bool
}{
"non-batched": {
batched: false,
},
"batched": {
batched: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
topicName, _ := utils.GetTopicName(topic)
subName := "test-sub"
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()
var producer pulsar.Producer
batchSize := 5
if tc.batched {
producer, err = client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
BatchingMaxMessages: uint(batchSize),
BatchingMaxPublishDelay: time.Second * 2,
})
assert.NoError(t, err)
defer producer.Close()
} else {
producer, err = client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.NoError(t, err)
defer producer.Close()
}
props := map[string]string{
"key1": "value1",
"KEY2": "VALUE2",
"KeY3": "VaLuE3",
"details=man": "good at playing basketball",
}
var wg sync.WaitGroup
numberOfMessagesToWaitFor := 10
numberOfMessagesToSend := numberOfMessagesToWaitFor
if tc.batched {
// If batched send one extra message to cause the batch to be sent immediately
numberOfMessagesToSend++
}
wg.Add(numberOfMessagesToWaitFor)
for i := 0; i < numberOfMessagesToSend; i++ {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: []byte("test-message"),
Properties: props,
}, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) {
assert.Nil(t, err)
if i < numberOfMessagesToWaitFor {
wg.Done()
}
})
}
wg.Wait()
// Peek messages
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, batchSize)
assert.NoError(t, err)
assert.NotNil(t, messages)
assert.Len(t, messages, batchSize)
// Verify properties of messages
for _, msg := range messages {
assert.Equal(t, "value1", msg.Properties["key1"])
assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
assert.Equal(t, "good at playing basketball", msg.Properties["details=man"])
// Standard pulsar properties, set by pulsar
assert.NotEmpty(t, msg.Properties["publish-time"])
if tc.batched {
assert.NotEmpty(t, msg.Properties[BatchHeader])
assert.Equal(t, strconv.Itoa(batchSize), msg.Properties[BatchHeader])
}
}
})
}
}
func TestGetMessageByID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
ctx := context.Background()
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
// create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()
messageID, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
assert.Nil(t, err)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
id, err := admin.Subscriptions().GetMessageByID(*topicName, messageID.LedgerID(), messageID.EntryID())
assert.Nil(t, err)
assert.Equal(t, id.MessageID.LedgerID, messageID.LedgerID())
assert.Equal(t, id.MessageID.EntryID, messageID.EntryID())
}