blob: 8493cf76d95857a15c921f63597599016b2ad0dc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"context"
"fmt"
"log"
"strings"
"testing"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
var (
lookupURL = "pulsar://localhost:6650"
)
func TestCreateTopic(t *testing.T) {
checkError := func(err error) {
if err != nil {
t.Error(err)
}
}
cfg := &config.Config{}
admin, err := New(cfg)
checkError(err)
topic := "persistent://public/default/testCreateTopic"
topicName, err := utils.GetTopicName(topic)
checkError(err)
err = admin.Topics().Create(*topicName, 0)
checkError(err)
topicLists, err := admin.Namespaces().GetTopics("public/default")
checkError(err)
for _, t := range topicLists {
if t == topic {
return
}
}
t.Error("Couldn't find topic: " + topic)
}
func TestTopics_Properties(t *testing.T) {
t.Run("NonPartitioned", func(t *testing.T) {
internalTestTopicsProperties(t, 0)
})
t.Run("Partitioned", func(t *testing.T) {
internalTestTopicsProperties(t, 4)
})
}
func verifyTopicProperties(t *testing.T, admin Client, topic *utils.TopicName,
expected map[string]string) {
properties, err := admin.Topics().GetProperties(*topic)
assert.NoError(t, err)
assert.Equal(t, expected, properties)
}
func internalTestTopicsProperties(t *testing.T, partitions int) {
topic := newTopicName()
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().CreateWithProperties(*topicName, partitions, map[string]string{
"key1": "value1",
})
assert.NoError(t, err)
verifyTopicProperties(t, admin, topicName, map[string]string{"key1": "value1"})
properties, err := admin.Topics().GetProperties(*topicName)
assert.NoError(t, err)
assert.Equal(t, properties["key1"], "value1")
newProperties := map[string]string{
"key1": "value1-updated",
"key2": "value2",
}
err = admin.Topics().UpdateProperties(*topicName, newProperties)
assert.NoError(t, err)
verifyTopicProperties(t, admin, topicName, newProperties)
err = admin.Topics().UpdateProperties(*topicName, map[string]string{"key3": "value3"})
assert.NoError(t, err)
verifyTopicProperties(t, admin, topicName, map[string]string{
"key1": "value1-updated",
"key2": "value2",
"key3": "value3",
})
err = admin.Topics().RemoveProperty(*topicName, "key1")
assert.NoError(t, err)
verifyTopicProperties(t, admin, topicName, map[string]string{
"key2": "value2",
"key3": "value3",
})
}
func TestPartitionState(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
// Create partition topic
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Send message
ctx := context.Background()
// create consumer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
subName := "my-sub"
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: pulsar.Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()
// create producer
producerName := "test-producer"
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
Name: producerName,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
}); err != nil {
log.Fatal(err)
}
}
stats, err := admin.Topics().GetPartitionedStatsWithOption(*topicName, true, utils.GetStatsOptions{
GetPreciseBacklog: false,
SubscriptionBacklogSize: false,
GetEarliestTimeInBacklog: false,
ExcludePublishers: true,
ExcludeConsumers: true,
})
assert.Nil(t, err)
assert.Equal(t, len(stats.Publishers), 0)
for _, topicStats := range stats.Partitions {
assert.Equal(t, len(topicStats.Publishers), 0)
for _, subscriptionStats := range topicStats.Subscriptions {
assert.Equal(t, len(subscriptionStats.Consumers), 0)
}
}
for _, subscriptionStats := range stats.Subscriptions {
assert.Equal(t, len(subscriptionStats.Consumers), 0)
}
partition, err := topicName.GetPartition(0)
assert.Nil(t, err)
topicState, err := admin.Topics().GetStats(*partition)
assert.Nil(t, err)
assert.Equal(t, len(topicState.Publishers), 1)
publisher := topicState.Publishers[0]
assert.Equal(t, publisher.AccessModel, utils.ProduceModeShared)
assert.Equal(t, publisher.IsSupportsPartialProducer, false)
assert.Equal(t, publisher.ProducerName, producerName)
assert.Contains(t, publisher.Address, "127.0.0.1")
assert.Contains(t, publisher.ClientVersion, "Pulsar-Go-version")
assert.NotContains(t, publisher.ClientVersion, " ")
sub := topicState.Subscriptions[subName]
assert.Equal(t, sub.BytesOutCounter, int64(0))
assert.Equal(t, sub.MsgOutCounter, int64(0))
assert.Equal(t, sub.MessageAckRate, float64(0))
assert.Equal(t, sub.ChunkedMessageRate, float64(0))
assert.Equal(t, sub.BacklogSize, int64(0))
assert.Equal(t, sub.EarliestMsgPublishTimeInBacklog, int64(0))
assert.Equal(t, sub.LastExpireTimestamp, int64(0))
assert.Equal(t, sub.TotalMsgExpired, int64(0))
assert.Equal(t, sub.LastMarkDeleteAdvancedTimestamp, int64(0))
assert.Equal(t, sub.IsDurable, true)
assert.Equal(t, sub.AllowOutOfOrderDelivery, false)
assert.Equal(t, sub.ConsumersAfterMarkDeletePosition, map[string]string{})
assert.Equal(t, sub.NonContiguousDeletedMessagesRanges, 0)
assert.Equal(t, sub.NonContiguousDeletedMessagesRangesSrzSize, 0)
assert.Equal(t, sub.DelayedMessageIndexSizeInBytes, int64(0))
assert.Equal(t, sub.SubscriptionProperties, map[string]string{})
assert.Equal(t, sub.FilterProcessedMsgCount, int64(0))
assert.Equal(t, sub.FilterAcceptedMsgCount, int64(0))
assert.Equal(t, sub.FilterRejectedMsgCount, int64(0))
assert.Equal(t, sub.FilterRescheduledMsgCount, int64(0))
assert.Equal(t, len(sub.Consumers), 1)
consumerState := sub.Consumers[0]
assert.Equal(t, consumerState.BytesOutCounter, int64(0))
assert.Equal(t, consumerState.MsgOutCounter, int64(0))
assert.Equal(t, consumerState.MessageAckRate, float64(0))
assert.Equal(t, consumerState.ChunkedMessageRate, float64(0))
assert.Equal(t, consumerState.AvgMessagesPerEntry, int(0))
assert.Contains(t, consumerState.Address, "127.0.0.1")
assert.Contains(t, consumerState.ClientVersion, "Pulsar-Go-version")
assert.NotContains(t, consumerState.ClientVersion, " ")
assert.Equal(t, consumerState.LastAckedTimestamp, int64(0))
assert.Equal(t, consumerState.LastConsumedTimestamp, int64(0))
assert.True(t, consumerState.LastConsumedFlowTimestamp > 0)
}
func TestNonPartitionState(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
// Create non-partition topic
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 0)
assert.NoError(t, err)
// Send message
ctx := context.Background()
// create consumer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: pulsar.Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()
// create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()
// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
}); err != nil {
log.Fatal(err)
}
}
stats, err := admin.Topics().GetStatsWithOption(*topicName, utils.GetStatsOptions{
GetPreciseBacklog: false,
SubscriptionBacklogSize: false,
GetEarliestTimeInBacklog: false,
ExcludePublishers: true,
ExcludeConsumers: true,
})
assert.Nil(t, err)
assert.Equal(t, len(stats.Publishers), 0)
for _, subscriptionStats := range stats.Subscriptions {
assert.Equal(t, len(subscriptionStats.Consumers), 0)
}
}
func newTopicName() string {
return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
}
func TestDeleteNonPartitionedTopic(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 0)
assert.NoError(t, err)
err = admin.Topics().Delete(*topicName, false, true)
assert.NoError(t, err)
topicList, err := admin.Namespaces().GetTopics("public/default")
assert.NoError(t, err)
isTopicExist := false
for _, topicIterator := range topicList {
if topicIterator == topic {
isTopicExist = true
}
}
assert.Equal(t, false, isTopicExist)
}
func TestDeletePartitionedTopic(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 3)
assert.NoError(t, err)
err = admin.Topics().Delete(*topicName, false, false)
assert.NoError(t, err)
topicList, err := admin.Namespaces().GetTopics("public/default")
assert.NoError(t, err)
isTopicExist := false
for _, topicIterator := range topicList {
if topicIterator == topic {
isTopicExist = true
}
}
assert.Equal(t, false, isTopicExist)
}
func TestUpdateTopicPartitions(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 3)
assert.NoError(t, err)
topicMetadata, err := admin.Topics().GetMetadata(*topicName)
assert.NoError(t, err)
assert.Equal(t, 3, topicMetadata.Partitions)
err = admin.Topics().Update(*topicName, 4)
assert.NoError(t, err)
topicMetadata, err = admin.Topics().GetMetadata(*topicName)
assert.NoError(t, err)
assert.Equal(t, 4, topicMetadata.Partitions)
}
func TestGetMessageID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
topicPartitionZero := topic + "-partition-0"
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
topicPartitionZeroName, err := utils.GetTopicName(topicPartitionZero)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 1)
assert.NoError(t, err)
ctx := context.Background()
// create consumer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: pulsar.Exclusive,
})
assert.NoError(t, err)
defer consumer.Close()
// create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.NoError(t, err)
defer producer.Close()
_, err = producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte("hello"),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
})
assert.NoError(t, err)
// ack message
msg, err := consumer.Receive(ctx)
assert.NoError(t, err)
assert.Equal(t, []byte("hello"), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
err = consumer.Ack(msg)
assert.NoError(t, err)
messageID, err := admin.Topics().GetMessageID(
*topicPartitionZeroName,
msg.PublishTime().Unix()*1000-1000,
)
assert.NoError(t, err)
assert.Equal(t, msg.ID().EntryID(), messageID.EntryID)
assert.Equal(t, msg.ID().LedgerID(), messageID.LedgerID)
assert.Equal(t, int(msg.ID().PartitionIdx()), messageID.PartitionIndex)
}
func TestMessageTTL(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
messageTTL, err := admin.Topics().GetMessageTTL(*topicName)
assert.NoError(t, err)
assert.Equal(t, -1, messageTTL)
err = admin.Topics().SetMessageTTL(*topicName, 600)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
messageTTL, err = admin.Topics().GetMessageTTL(*topicName)
return err == nil && messageTTL == 600
},
10*time.Second,
100*time.Millisecond,
)
err = admin.Topics().RemoveMessageTTL(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
messageTTL, err = admin.Topics().GetMessageTTL(*topicName)
return err == nil && messageTTL == -1
},
10*time.Second,
100*time.Millisecond,
)
}
func TestRetention(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
topicRetentionPolicy, err := admin.Topics().GetRetention(*topicName, false)
assert.NoError(t, err)
assert.Equal(t, int64(0), topicRetentionPolicy.RetentionSizeInMB)
assert.Equal(t, 0, topicRetentionPolicy.RetentionTimeInMinutes)
err = admin.Topics().SetRetention(*topicName, utils.RetentionPolicies{
RetentionSizeInMB: 20480,
RetentionTimeInMinutes: 1440,
})
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
topicRetentionPolicy, err = admin.Topics().GetRetention(*topicName, false)
return err == nil &&
topicRetentionPolicy.RetentionSizeInMB == int64(20480) &&
topicRetentionPolicy.RetentionTimeInMinutes == 1440
},
10*time.Second,
100*time.Millisecond,
)
err = admin.Topics().RemoveRetention(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
topicRetentionPolicy, err = admin.Topics().GetRetention(*topicName, false)
return err == nil &&
topicRetentionPolicy.RetentionSizeInMB == int64(0) &&
topicRetentionPolicy.RetentionTimeInMinutes == 0
},
10*time.Second,
100*time.Millisecond,
)
}
func TestSubscribeRate(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default subscribe rate (adapt to actual server behavior)
initialSubscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
assert.NoError(t, err)
// Store initial values for later comparison instead of assuming specific defaults
initialConsumerRate := initialSubscribeRate.SubscribeThrottlingRatePerConsumer
initialRatePeriod := initialSubscribeRate.RatePeriodInSecond
// Set new subscribe rate
newSubscribeRate := utils.SubscribeRate{
SubscribeThrottlingRatePerConsumer: 10,
RatePeriodInSecond: 60,
}
err = admin.Topics().SetSubscribeRate(*topicName, newSubscribeRate)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
subscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
return err == nil &&
subscribeRate.SubscribeThrottlingRatePerConsumer == 10 &&
subscribeRate.RatePeriodInSecond == 60
},
10*time.Second,
100*time.Millisecond,
)
// Remove subscribe rate policy
err = admin.Topics().RemoveSubscribeRate(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
subscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
return err == nil &&
subscribeRate.SubscribeThrottlingRatePerConsumer == initialConsumerRate &&
subscribeRate.RatePeriodInSecond == initialRatePeriod
},
10*time.Second,
100*time.Millisecond,
)
}
func TestSubscriptionDispatchRate(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default subscription dispatch rate (adapt to actual server behavior)
initialDispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName)
assert.NoError(t, err)
// Store initial values for later comparison instead of assuming specific defaults
initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
initialRatePeriod := initialDispatchRate.RatePeriodInSecond
initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
// Set new subscription dispatch rate
newDispatchRate := utils.DispatchRateData{
DispatchThrottlingRateInMsg: 1000,
DispatchThrottlingRateInByte: 1048576, // 1MB
RatePeriodInSecond: 30,
RelativeToPublishRate: true,
}
err = admin.Topics().SetSubscriptionDispatchRate(*topicName, newDispatchRate)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
dispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName)
return err == nil &&
dispatchRate.DispatchThrottlingRateInMsg == 1000 &&
dispatchRate.DispatchThrottlingRateInByte == 1048576 &&
dispatchRate.RatePeriodInSecond == 30 &&
dispatchRate.RelativeToPublishRate == true
},
10*time.Second,
100*time.Millisecond,
)
// Remove subscription dispatch rate policy
err = admin.Topics().RemoveSubscriptionDispatchRate(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
dispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName)
return err == nil &&
dispatchRate.DispatchThrottlingRateInMsg == initialMsgRate &&
dispatchRate.DispatchThrottlingRateInByte == initialByteRate &&
dispatchRate.RatePeriodInSecond == initialRatePeriod &&
dispatchRate.RelativeToPublishRate == initialRelativeToPublish
},
10*time.Second,
100*time.Millisecond,
)
}
func TestMaxConsumersPerSubscription(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default max consumers per subscription
maxConsumers, err := admin.Topics().GetMaxConsumersPerSubscription(*topicName)
assert.NoError(t, err)
assert.Equal(t, -1, maxConsumers)
// Set new max consumers per subscription
err = admin.Topics().SetMaxConsumersPerSubscription(*topicName, 10)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
maxConsumers, err = admin.Topics().GetMaxConsumersPerSubscription(*topicName)
return err == nil && maxConsumers == 10
},
10*time.Second,
100*time.Millisecond,
)
// Remove max consumers per subscription policy
err = admin.Topics().RemoveMaxConsumersPerSubscription(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
maxConsumers, err = admin.Topics().GetMaxConsumersPerSubscription(*topicName)
return err == nil && maxConsumers == -1
},
10*time.Second,
100*time.Millisecond,
)
}
func TestMaxMessageSize(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default max message size
maxMessageSize, err := admin.Topics().GetMaxMessageSize(*topicName)
assert.NoError(t, err)
assert.Equal(t, -1, maxMessageSize)
// Set new max message size (1MB)
err = admin.Topics().SetMaxMessageSize(*topicName, 1048576)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
maxMessageSize, err = admin.Topics().GetMaxMessageSize(*topicName)
return err == nil && maxMessageSize == 1048576
},
10*time.Second,
100*time.Millisecond,
)
// Remove max message size policy
err = admin.Topics().RemoveMaxMessageSize(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
maxMessageSize, err = admin.Topics().GetMaxMessageSize(*topicName)
return err == nil && maxMessageSize == -1
},
10*time.Second,
100*time.Millisecond,
)
}
func TestMaxSubscriptionsPerTopic(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default max subscriptions per topic
maxSubscriptions, err := admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
assert.NoError(t, err)
assert.Equal(t, -1, maxSubscriptions)
// Set new max subscriptions per topic
err = admin.Topics().SetMaxSubscriptionsPerTopic(*topicName, 100)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
maxSubscriptions, err = admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
return err == nil && maxSubscriptions == 100
},
10*time.Second,
100*time.Millisecond,
)
// Remove max subscriptions per topic policy
err = admin.Topics().RemoveMaxSubscriptionsPerTopic(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
maxSubscriptions, err = admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
return err == nil && maxSubscriptions == -1
},
10*time.Second,
100*time.Millisecond,
)
}
func TestSchemaValidationEnforced(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default schema validation enforced
schemaValidationEnforced, err := admin.Topics().GetSchemaValidationEnforced(*topicName)
if err != nil {
// Skip test if API is not available (e.g., 405 Method Not Allowed)
if strings.Contains(err.Error(), "405") || strings.Contains(err.Error(), "Method Not Allowed") {
t.Skip("SchemaValidationEnforced API not available on this Pulsar version")
return
}
assert.NoError(t, err)
}
initialValidationEnforced := schemaValidationEnforced
// Set schema validation enforced to true
err = admin.Topics().SetSchemaValidationEnforced(*topicName, true)
if err != nil {
// Skip test if API is not available
if strings.Contains(err.Error(), "405") || strings.Contains(err.Error(), "Method Not Allowed") {
t.Skip("SetSchemaValidationEnforced API not available on this Pulsar version")
return
}
assert.NoError(t, err)
}
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
schemaValidationEnforced, err := admin.Topics().GetSchemaValidationEnforced(*topicName)
return err == nil && schemaValidationEnforced == true
},
10*time.Second,
100*time.Millisecond,
)
// Remove schema validation enforced policy
err = admin.Topics().RemoveSchemaValidationEnforced(*topicName)
if err != nil {
// Skip removal check if API is not available
if strings.Contains(err.Error(), "405") || strings.Contains(err.Error(), "Method Not Allowed") {
t.Skip("RemoveSchemaValidationEnforced API not available on this Pulsar version")
return
}
assert.NoError(t, err)
}
assert.Eventually(
t,
func() bool {
schemaValidationEnforced, err := admin.Topics().GetSchemaValidationEnforced(*topicName)
return err == nil && schemaValidationEnforced == initialValidationEnforced
},
10*time.Second,
100*time.Millisecond,
)
}
func TestDeduplicationSnapshotInterval(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default deduplication snapshot interval
interval, err := admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
assert.NoError(t, err)
assert.Equal(t, -1, interval)
// Set new deduplication snapshot interval
err = admin.Topics().SetDeduplicationSnapshotInterval(*topicName, 1000)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
interval, err = admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
return err == nil && interval == 1000
},
10*time.Second,
100*time.Millisecond,
)
// Remove deduplication snapshot interval policy
err = admin.Topics().RemoveDeduplicationSnapshotInterval(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
interval, err = admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
return err == nil && interval == -1
},
10*time.Second,
100*time.Millisecond,
)
}
func TestReplicatorDispatchRate(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default replicator dispatch rate (adapt to actual server behavior)
initialDispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName)
assert.NoError(t, err)
// Store initial values for later comparison instead of assuming specific defaults
initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
initialRatePeriod := initialDispatchRate.RatePeriodInSecond
initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
// Set new replicator dispatch rate
newDispatchRate := utils.DispatchRateData{
DispatchThrottlingRateInMsg: 500,
DispatchThrottlingRateInByte: 524288, // 512KB
RatePeriodInSecond: 60,
RelativeToPublishRate: true,
}
err = admin.Topics().SetReplicatorDispatchRate(*topicName, newDispatchRate)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
dispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName)
return err == nil &&
dispatchRate.DispatchThrottlingRateInMsg == 500 &&
dispatchRate.DispatchThrottlingRateInByte == 524288 &&
dispatchRate.RatePeriodInSecond == 60 &&
dispatchRate.RelativeToPublishRate == true
},
10*time.Second,
100*time.Millisecond,
)
// Remove replicator dispatch rate policy
err = admin.Topics().RemoveReplicatorDispatchRate(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
dispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName)
return err == nil &&
dispatchRate.DispatchThrottlingRateInMsg == initialMsgRate &&
dispatchRate.DispatchThrottlingRateInByte == initialByteRate &&
dispatchRate.RatePeriodInSecond == initialRatePeriod &&
dispatchRate.RelativeToPublishRate == initialRelativeToPublish
},
10*time.Second,
100*time.Millisecond,
)
}
func TestOffloadPolicies(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default offload policies
offloadPolicies, err := admin.Topics().GetOffloadPolicies(*topicName)
assert.NoError(t, err)
// Default values should be empty/default
assert.Equal(t, "", offloadPolicies.ManagedLedgerOffloadDriver)
assert.Equal(t, 0, offloadPolicies.ManagedLedgerOffloadMaxThreads)
// Set new offload policies
newOffloadPolicies := utils.OffloadPolicies{
ManagedLedgerOffloadDriver: "aws-s3",
ManagedLedgerOffloadMaxThreads: 4,
ManagedLedgerOffloadThresholdInBytes: 1073741824, // 1GB
ManagedLedgerOffloadDeletionLagInMillis: 3600000, // 1 hour
ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: 2147483648, // 2GB
S3ManagedLedgerOffloadBucket: "test-bucket",
S3ManagedLedgerOffloadRegion: "us-west-2",
S3ManagedLedgerOffloadServiceEndpoint: "https://s3.amazonaws.com",
}
err = admin.Topics().SetOffloadPolicies(*topicName, newOffloadPolicies)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
offloadPolicies, err = admin.Topics().GetOffloadPolicies(*topicName)
return err == nil &&
offloadPolicies.ManagedLedgerOffloadDriver == "aws-s3" &&
offloadPolicies.ManagedLedgerOffloadMaxThreads == 4 &&
offloadPolicies.ManagedLedgerOffloadThresholdInBytes == 1073741824 &&
offloadPolicies.S3ManagedLedgerOffloadBucket == "test-bucket"
},
10*time.Second,
100*time.Millisecond,
)
// Remove offload policies
err = admin.Topics().RemoveOffloadPolicies(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
offloadPolicies, err = admin.Topics().GetOffloadPolicies(*topicName)
return err == nil &&
offloadPolicies.ManagedLedgerOffloadDriver == "" &&
offloadPolicies.ManagedLedgerOffloadMaxThreads == 0
},
10*time.Second,
100*time.Millisecond,
)
}
func TestAutoSubscriptionCreation(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default auto subscription creation
autoSubCreation, err := admin.Topics().GetAutoSubscriptionCreation(*topicName)
assert.NoError(t, err)
assert.Equal(t, false, autoSubCreation.AllowAutoSubscriptionCreation)
// Set auto subscription creation to true
newAutoSubCreation := utils.AutoSubscriptionCreationOverride{
AllowAutoSubscriptionCreation: true,
}
err = admin.Topics().SetAutoSubscriptionCreation(*topicName, newAutoSubCreation)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
autoSubCreation, err = admin.Topics().GetAutoSubscriptionCreation(*topicName)
return err == nil &&
autoSubCreation.AllowAutoSubscriptionCreation == true
},
10*time.Second,
100*time.Millisecond,
)
// Remove auto subscription creation policy
err = admin.Topics().RemoveAutoSubscriptionCreation(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
autoSubCreation, err = admin.Topics().GetAutoSubscriptionCreation(*topicName)
return err == nil &&
autoSubCreation.AllowAutoSubscriptionCreation == false
},
10*time.Second,
100*time.Millisecond,
)
}
func TestSchemaCompatibilityStrategy(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default schema compatibility strategy (adapt to actual server behavior)
initialStrategy, err := admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
assert.NoError(t, err)
// Server may return empty string instead of "UNDEFINED"
// Set new schema compatibility strategy
err = admin.Topics().SetSchemaCompatibilityStrategy(*topicName, utils.SchemaCompatibilityStrategyBackward)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
strategy, err := admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
return err == nil &&
strategy == utils.SchemaCompatibilityStrategyBackward
},
10*time.Second,
100*time.Millisecond,
)
// Remove schema compatibility strategy policy
err = admin.Topics().RemoveSchemaCompatibilityStrategy(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
strategy, err := admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
return err == nil &&
strategy == initialStrategy
},
10*time.Second,
100*time.Millisecond,
)
}
func TestTopics_MaxProducers(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default (should be -1)
maxProducers, err := admin.Topics().GetMaxProducers(*topicName)
assert.NoError(t, err)
assert.Equal(t, -1, maxProducers)
// Set to 0 explicitly (unlimited)
err = admin.Topics().SetMaxProducers(*topicName, 0)
assert.NoError(t, err)
// Verify returns 0
assert.Eventually(
t,
func() bool {
maxProducers, err = admin.Topics().GetMaxProducers(*topicName)
return err == nil && maxProducers == 0
},
10*time.Second,
100*time.Millisecond,
)
// Set to positive value
err = admin.Topics().SetMaxProducers(*topicName, 10)
assert.NoError(t, err)
// Verify returns value
assert.Eventually(
t,
func() bool {
maxProducers, err = admin.Topics().GetMaxProducers(*topicName)
return err == nil && maxProducers == 10
},
10*time.Second,
100*time.Millisecond,
)
// Remove
err = admin.Topics().RemoveMaxProducers(*topicName)
assert.NoError(t, err)
}
func TestTopics_MaxConsumers(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default (should be -1)
maxConsumers, err := admin.Topics().GetMaxConsumers(*topicName)
assert.NoError(t, err)
assert.Equal(t, -1, maxConsumers)
// Set to 0 explicitly
err = admin.Topics().SetMaxConsumers(*topicName, 0)
assert.NoError(t, err)
// Verify returns 0
assert.Eventually(
t,
func() bool {
maxConsumers, err = admin.Topics().GetMaxConsumers(*topicName)
return err == nil && maxConsumers == 0
},
10*time.Second,
100*time.Millisecond,
)
// Set to positive value
err = admin.Topics().SetMaxConsumers(*topicName, 20)
assert.NoError(t, err)
// Verify returns value
assert.Eventually(
t,
func() bool {
maxConsumers, err = admin.Topics().GetMaxConsumers(*topicName)
return err == nil && maxConsumers == 20
},
10*time.Second,
100*time.Millisecond,
)
// Remove
err = admin.Topics().RemoveMaxConsumers(*topicName)
assert.NoError(t, err)
}
func TestTopics_MaxUnackMessagesPerConsumer(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default (should be -1)
maxUnack, err := admin.Topics().GetMaxUnackMessagesPerConsumer(*topicName)
assert.NoError(t, err)
assert.Equal(t, -1, maxUnack)
// Set to 0 explicitly
err = admin.Topics().SetMaxUnackMessagesPerConsumer(*topicName, 0)
assert.NoError(t, err)
// Verify returns 0
assert.Eventually(
t,
func() bool {
maxUnack, err = admin.Topics().GetMaxUnackMessagesPerConsumer(*topicName)
return err == nil && maxUnack == 0
},
10*time.Second,
100*time.Millisecond,
)
// Set to positive value
err = admin.Topics().SetMaxUnackMessagesPerConsumer(*topicName, 1000)
assert.NoError(t, err)
// Verify returns value
assert.Eventually(
t,
func() bool {
maxUnack, err = admin.Topics().GetMaxUnackMessagesPerConsumer(*topicName)
return err == nil && maxUnack == 1000
},
10*time.Second,
100*time.Millisecond,
)
// Remove
err = admin.Topics().RemoveMaxUnackMessagesPerConsumer(*topicName)
assert.NoError(t, err)
}
func TestTopics_MaxUnackMessagesPerSubscription(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
// Get default (should be -1)
maxUnack, err := admin.Topics().GetMaxUnackMessagesPerSubscription(*topicName)
assert.NoError(t, err)
assert.Equal(t, -1, maxUnack)
// Set to 0 explicitly
err = admin.Topics().SetMaxUnackMessagesPerSubscription(*topicName, 0)
assert.NoError(t, err)
// Verify returns 0
assert.Eventually(
t,
func() bool {
maxUnack, err = admin.Topics().GetMaxUnackMessagesPerSubscription(*topicName)
return err == nil && maxUnack == 0
},
10*time.Second,
100*time.Millisecond,
)
// Set to positive value
err = admin.Topics().SetMaxUnackMessagesPerSubscription(*topicName, 5000)
assert.NoError(t, err)
// Verify returns value
assert.Eventually(
t,
func() bool {
maxUnack, err = admin.Topics().GetMaxUnackMessagesPerSubscription(*topicName)
return err == nil && maxUnack == 5000
},
10*time.Second,
100*time.Millisecond,
)
// Remove
err = admin.Topics().RemoveMaxUnackMessagesPerSubscription(*topicName)
assert.NoError(t, err)
}