feat: align topics level policies admin apis to java restful apis (#1398)
* feat: add topic configuration methods for subscribe rate, dispatch rate, max consumers, message size, subscriptions, schema validation, deduplication, replicator dispatch rate, offload policies, auto subscription creation, and schema compatibility strategy
* fix: lint error
* fix: lint
* add tests
* fix tests
* add unit tests
diff --git a/integration-tests/blue-green/docker-compose.yml b/integration-tests/blue-green/docker-compose.yml
index f04e24f..ae5de47 100644
--- a/integration-tests/blue-green/docker-compose.yml
+++ b/integration-tests/blue-green/docker-compose.yml
@@ -130,6 +130,7 @@
- loadBalancerDebugModeEnabled=true
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+ - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
green-zookeeper:
condition: service_healthy
@@ -163,6 +164,7 @@
- loadBalancerDebugModeEnabled=true
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+ - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
green-zookeeper:
condition: service_healthy
diff --git a/integration-tests/clustered/docker-compose.yml b/integration-tests/clustered/docker-compose.yml
index eb96ca9..8eb997e 100644
--- a/integration-tests/clustered/docker-compose.yml
+++ b/integration-tests/clustered/docker-compose.yml
@@ -122,6 +122,7 @@
- advertisedListeners=internal:pulsar://broker-1:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+ - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
zookeeper:
condition: service_healthy
@@ -154,6 +155,7 @@
- advertisedListeners=internal:pulsar://broker-2:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+ - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
zookeeper:
condition: service_healthy
diff --git a/integration-tests/extensible-load-manager/docker-compose.yml b/integration-tests/extensible-load-manager/docker-compose.yml
index 063a235..d2a88fb 100644
--- a/integration-tests/extensible-load-manager/docker-compose.yml
+++ b/integration-tests/extensible-load-manager/docker-compose.yml
@@ -129,6 +129,7 @@
- clusterMigrationCheckDurationSeconds=1
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+ - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
zookeeper:
condition: service_healthy
@@ -168,6 +169,7 @@
- clusterMigrationCheckDurationSeconds=1
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+ - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
zookeeper:
condition: service_healthy
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index c7fe222..4d6fff1 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -389,6 +389,107 @@
// @param data
// list of replication cluster id
SetReplicationClusters(topic utils.TopicName, data []string) error
+
+ // GetSubscribeRate Get subscribe rate configuration for a topic
+ GetSubscribeRate(utils.TopicName) (*utils.SubscribeRate, error)
+
+ // SetSubscribeRate Set subscribe rate configuration for a topic
+ SetSubscribeRate(utils.TopicName, utils.SubscribeRate) error
+
+ // RemoveSubscribeRate Remove subscribe rate configuration for a topic
+ RemoveSubscribeRate(utils.TopicName) error
+
+ // GetSubscriptionDispatchRate Get subscription dispatch rate for a topic
+ GetSubscriptionDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
+
+ // SetSubscriptionDispatchRate Set subscription dispatch rate for a topic
+ SetSubscriptionDispatchRate(utils.TopicName, utils.DispatchRateData) error
+
+ // RemoveSubscriptionDispatchRate Remove subscription dispatch rate for a topic
+ RemoveSubscriptionDispatchRate(utils.TopicName) error
+
+ // GetMaxConsumersPerSubscription Get max consumers per subscription for a topic
+ GetMaxConsumersPerSubscription(utils.TopicName) (int, error)
+
+ // SetMaxConsumersPerSubscription Set max consumers per subscription for a topic
+ SetMaxConsumersPerSubscription(utils.TopicName, int) error
+
+ // RemoveMaxConsumersPerSubscription Remove max consumers per subscription for a topic
+ RemoveMaxConsumersPerSubscription(utils.TopicName) error
+
+ // GetMaxMessageSize Get max message size for a topic
+ GetMaxMessageSize(utils.TopicName) (int, error)
+
+ // SetMaxMessageSize Set max message size for a topic
+ SetMaxMessageSize(utils.TopicName, int) error
+
+ // RemoveMaxMessageSize Remove max message size for a topic
+ RemoveMaxMessageSize(utils.TopicName) error
+
+ // GetMaxSubscriptionsPerTopic Get max subscriptions per topic
+ GetMaxSubscriptionsPerTopic(utils.TopicName) (int, error)
+
+ // SetMaxSubscriptionsPerTopic Set max subscriptions per topic
+ SetMaxSubscriptionsPerTopic(utils.TopicName, int) error
+
+ // RemoveMaxSubscriptionsPerTopic Remove max subscriptions per topic
+ RemoveMaxSubscriptionsPerTopic(utils.TopicName) error
+
+ // GetSchemaValidationEnforced Get schema validation enforced flag for a topic
+ GetSchemaValidationEnforced(utils.TopicName) (bool, error)
+
+ // SetSchemaValidationEnforced Set schema validation enforced flag for a topic
+ SetSchemaValidationEnforced(utils.TopicName, bool) error
+
+ // RemoveSchemaValidationEnforced Remove schema validation enforced flag for a topic
+ RemoveSchemaValidationEnforced(utils.TopicName) error
+
+ // GetDeduplicationSnapshotInterval Get deduplication snapshot interval for a topic
+ GetDeduplicationSnapshotInterval(utils.TopicName) (int, error)
+
+ // SetDeduplicationSnapshotInterval Set deduplication snapshot interval for a topic
+ SetDeduplicationSnapshotInterval(utils.TopicName, int) error
+
+ // RemoveDeduplicationSnapshotInterval Remove deduplication snapshot interval for a topic
+ RemoveDeduplicationSnapshotInterval(utils.TopicName) error
+
+ // GetReplicatorDispatchRate Get replicator dispatch rate for a topic
+ GetReplicatorDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
+
+ // SetReplicatorDispatchRate Set replicator dispatch rate for a topic
+ SetReplicatorDispatchRate(utils.TopicName, utils.DispatchRateData) error
+
+ // RemoveReplicatorDispatchRate Remove replicator dispatch rate for a topic
+ RemoveReplicatorDispatchRate(utils.TopicName) error
+
+ // GetOffloadPolicies Get offload policies for a topic
+ GetOffloadPolicies(utils.TopicName) (*utils.OffloadPolicies, error)
+
+ // SetOffloadPolicies Set offload policies for a topic
+ SetOffloadPolicies(utils.TopicName, utils.OffloadPolicies) error
+
+ // RemoveOffloadPolicies Remove offload policies for a topic
+ RemoveOffloadPolicies(utils.TopicName) error
+
+ // GetAutoSubscriptionCreation Get auto subscription creation override for a topic
+ GetAutoSubscriptionCreation(utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error)
+
+ // SetAutoSubscriptionCreation Set auto subscription creation override for a topic
+ SetAutoSubscriptionCreation(utils.TopicName,
+ utils.AutoSubscriptionCreationOverride) error
+
+ // RemoveAutoSubscriptionCreation Remove auto subscription creation override for a topic
+ RemoveAutoSubscriptionCreation(utils.TopicName) error
+
+ // GetSchemaCompatibilityStrategy Get schema compatibility strategy for a topic
+ GetSchemaCompatibilityStrategy(utils.TopicName) (utils.SchemaCompatibilityStrategy, error)
+
+ // SetSchemaCompatibilityStrategy Set schema compatibility strategy for a topic
+ SetSchemaCompatibilityStrategy(utils.TopicName,
+ utils.SchemaCompatibilityStrategy) error
+
+ // RemoveSchemaCompatibilityStrategy Remove schema compatibility strategy for a topic
+ RemoveSchemaCompatibilityStrategy(utils.TopicName) error
}
type topics struct {
@@ -933,3 +1034,192 @@
err := t.pulsar.Client.Get(endpoint, &data)
return data, err
}
+
+func (t *topics) GetSubscribeRate(topic utils.TopicName) (*utils.SubscribeRate, error) {
+ var subscribeRate utils.SubscribeRate
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
+ err := t.pulsar.Client.Get(endpoint, &subscribeRate)
+ return &subscribeRate, err
+}
+
+func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate utils.SubscribeRate) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
+ return t.pulsar.Client.Post(endpoint, &subscribeRate)
+}
+
+func (t *topics) RemoveSubscribeRate(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetSubscriptionDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
+ var dispatchRate utils.DispatchRateData
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
+ err := t.pulsar.Client.Get(endpoint, &dispatchRate)
+ return &dispatchRate, err
+}
+
+func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
+ return t.pulsar.Client.Post(endpoint, &dispatchRate)
+}
+
+func (t *topics) RemoveSubscriptionDispatchRate(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetMaxConsumersPerSubscription(topic utils.TopicName) (int, error) {
+ var maxConsumers int
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
+ err := t.pulsar.Client.Get(endpoint, &maxConsumers)
+ return maxConsumers, err
+}
+
+func (t *topics) SetMaxConsumersPerSubscription(topic utils.TopicName, maxConsumers int) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
+ return t.pulsar.Client.Post(endpoint, &maxConsumers)
+}
+
+func (t *topics) RemoveMaxConsumersPerSubscription(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetMaxMessageSize(topic utils.TopicName) (int, error) {
+ var maxMessageSize int
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
+ err := t.pulsar.Client.Get(endpoint, &maxMessageSize)
+ return maxMessageSize, err
+}
+
+func (t *topics) SetMaxMessageSize(topic utils.TopicName, maxMessageSize int) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
+ return t.pulsar.Client.Post(endpoint, &maxMessageSize)
+}
+
+func (t *topics) RemoveMaxMessageSize(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetMaxSubscriptionsPerTopic(topic utils.TopicName) (int, error) {
+ var maxSubscriptions int
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
+ err := t.pulsar.Client.Get(endpoint, &maxSubscriptions)
+ return maxSubscriptions, err
+}
+
+func (t *topics) SetMaxSubscriptionsPerTopic(topic utils.TopicName, maxSubscriptions int) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
+ return t.pulsar.Client.Post(endpoint, &maxSubscriptions)
+}
+
+func (t *topics) RemoveMaxSubscriptionsPerTopic(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetSchemaValidationEnforced(topic utils.TopicName) (bool, error) {
+ var enabled bool
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
+ err := t.pulsar.Client.Get(endpoint, &enabled)
+ return enabled, err
+}
+
+func (t *topics) SetSchemaValidationEnforced(topic utils.TopicName, enabled bool) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
+ return t.pulsar.Client.Post(endpoint, enabled)
+}
+
+func (t *topics) RemoveSchemaValidationEnforced(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetDeduplicationSnapshotInterval(topic utils.TopicName) (int, error) {
+ var interval int
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
+ err := t.pulsar.Client.Get(endpoint, &interval)
+ return interval, err
+}
+
+func (t *topics) SetDeduplicationSnapshotInterval(topic utils.TopicName, interval int) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
+ return t.pulsar.Client.Post(endpoint, &interval)
+}
+
+func (t *topics) RemoveDeduplicationSnapshotInterval(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetReplicatorDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
+ var dispatchRate utils.DispatchRateData
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
+ err := t.pulsar.Client.Get(endpoint, &dispatchRate)
+ return &dispatchRate, err
+}
+
+func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
+ return t.pulsar.Client.Post(endpoint, &dispatchRate)
+}
+
+func (t *topics) RemoveReplicatorDispatchRate(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetAutoSubscriptionCreation(topic utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error) {
+ var autoSubCreation utils.AutoSubscriptionCreationOverride
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
+ err := t.pulsar.Client.Get(endpoint, &autoSubCreation)
+ return &autoSubCreation, err
+}
+
+func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName,
+ autoSubCreation utils.AutoSubscriptionCreationOverride) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
+ return t.pulsar.Client.Post(endpoint, &autoSubCreation)
+}
+
+func (t *topics) RemoveAutoSubscriptionCreation(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetSchemaCompatibilityStrategy(topic utils.TopicName) (utils.SchemaCompatibilityStrategy, error) {
+ var strategy utils.SchemaCompatibilityStrategy
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
+ err := t.pulsar.Client.Get(endpoint, &strategy)
+ return strategy, err
+}
+
+func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
+ strategy utils.SchemaCompatibilityStrategy) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
+ return t.pulsar.Client.Put(endpoint, strategy)
+}
+
+func (t *topics) RemoveSchemaCompatibilityStrategy(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
+ return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetOffloadPolicies(topic utils.TopicName) (*utils.OffloadPolicies, error) {
+ var offloadPolicies utils.OffloadPolicies
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
+ err := t.pulsar.Client.Get(endpoint, &offloadPolicies)
+ return &offloadPolicies, err
+}
+
+func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies utils.OffloadPolicies) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
+ return t.pulsar.Client.Post(endpoint, &offloadPolicies)
+}
+
+func (t *topics) RemoveOffloadPolicies(topic utils.TopicName) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
+ return t.pulsar.Client.Delete(endpoint)
+}
diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go
index be08b64..9abf802 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -21,6 +21,7 @@
"context"
"fmt"
"log"
+ "strings"
"testing"
"time"
@@ -551,3 +552,612 @@
100*time.Millisecond,
)
}
+
+func TestSubscribeRate(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default subscribe rate (adapt to actual server behavior)
+ initialSubscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
+ assert.NoError(t, err)
+ // Store initial values for later comparison instead of assuming specific defaults
+ initialConsumerRate := initialSubscribeRate.SubscribeThrottlingRatePerConsumer
+ initialRatePeriod := initialSubscribeRate.RatePeriodInSecond
+
+ // Set new subscribe rate
+ newSubscribeRate := utils.SubscribeRate{
+ SubscribeThrottlingRatePerConsumer: 10,
+ RatePeriodInSecond: 60,
+ }
+ err = admin.Topics().SetSubscribeRate(*topicName, newSubscribeRate)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ subscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
+ return err == nil &&
+ subscribeRate.SubscribeThrottlingRatePerConsumer == 10 &&
+ subscribeRate.RatePeriodInSecond == 60
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove subscribe rate policy
+ err = admin.Topics().RemoveSubscribeRate(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ subscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
+ return err == nil &&
+ subscribeRate.SubscribeThrottlingRatePerConsumer == initialConsumerRate &&
+ subscribeRate.RatePeriodInSecond == initialRatePeriod
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestSubscriptionDispatchRate(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default subscription dispatch rate (adapt to actual server behavior)
+ initialDispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName)
+ assert.NoError(t, err)
+ // Store initial values for later comparison instead of assuming specific defaults
+ initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
+ initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
+ initialRatePeriod := initialDispatchRate.RatePeriodInSecond
+ initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
+
+ // Set new subscription dispatch rate
+ newDispatchRate := utils.DispatchRateData{
+ DispatchThrottlingRateInMsg: 1000,
+ DispatchThrottlingRateInByte: 1048576, // 1MB
+ RatePeriodInSecond: 30,
+ RelativeToPublishRate: true,
+ }
+ err = admin.Topics().SetSubscriptionDispatchRate(*topicName, newDispatchRate)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ dispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName)
+ return err == nil &&
+ dispatchRate.DispatchThrottlingRateInMsg == 1000 &&
+ dispatchRate.DispatchThrottlingRateInByte == 1048576 &&
+ dispatchRate.RatePeriodInSecond == 30 &&
+ dispatchRate.RelativeToPublishRate == true
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove subscription dispatch rate policy
+ err = admin.Topics().RemoveSubscriptionDispatchRate(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ dispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName)
+ return err == nil &&
+ dispatchRate.DispatchThrottlingRateInMsg == initialMsgRate &&
+ dispatchRate.DispatchThrottlingRateInByte == initialByteRate &&
+ dispatchRate.RatePeriodInSecond == initialRatePeriod &&
+ dispatchRate.RelativeToPublishRate == initialRelativeToPublish
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestMaxConsumersPerSubscription(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default max consumers per subscription
+ maxConsumers, err := admin.Topics().GetMaxConsumersPerSubscription(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, maxConsumers)
+
+ // Set new max consumers per subscription
+ err = admin.Topics().SetMaxConsumersPerSubscription(*topicName, 10)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ maxConsumers, err = admin.Topics().GetMaxConsumersPerSubscription(*topicName)
+ return err == nil && maxConsumers == 10
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove max consumers per subscription policy
+ err = admin.Topics().RemoveMaxConsumersPerSubscription(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ maxConsumers, err = admin.Topics().GetMaxConsumersPerSubscription(*topicName)
+ return err == nil && maxConsumers == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestMaxMessageSize(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default max message size
+ maxMessageSize, err := admin.Topics().GetMaxMessageSize(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, maxMessageSize)
+
+ // Set new max message size (1MB)
+ err = admin.Topics().SetMaxMessageSize(*topicName, 1048576)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ maxMessageSize, err = admin.Topics().GetMaxMessageSize(*topicName)
+ return err == nil && maxMessageSize == 1048576
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove max message size policy
+ err = admin.Topics().RemoveMaxMessageSize(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ maxMessageSize, err = admin.Topics().GetMaxMessageSize(*topicName)
+ return err == nil && maxMessageSize == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestMaxSubscriptionsPerTopic(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default max subscriptions per topic
+ maxSubscriptions, err := admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, maxSubscriptions)
+
+ // Set new max subscriptions per topic
+ err = admin.Topics().SetMaxSubscriptionsPerTopic(*topicName, 100)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ maxSubscriptions, err = admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
+ return err == nil && maxSubscriptions == 100
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove max subscriptions per topic policy
+ err = admin.Topics().RemoveMaxSubscriptionsPerTopic(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ maxSubscriptions, err = admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
+ return err == nil && maxSubscriptions == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestSchemaValidationEnforced(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default schema validation enforced
+ schemaValidationEnforced, err := admin.Topics().GetSchemaValidationEnforced(*topicName)
+ if err != nil {
+ // Skip test if API is not available (e.g., 405 Method Not Allowed)
+ if strings.Contains(err.Error(), "405") || strings.Contains(err.Error(), "Method Not Allowed") {
+ t.Skip("SchemaValidationEnforced API not available on this Pulsar version")
+ return
+ }
+ assert.NoError(t, err)
+ }
+ initialValidationEnforced := schemaValidationEnforced
+
+ // Set schema validation enforced to true
+ err = admin.Topics().SetSchemaValidationEnforced(*topicName, true)
+ if err != nil {
+ // Skip test if API is not available
+ if strings.Contains(err.Error(), "405") || strings.Contains(err.Error(), "Method Not Allowed") {
+ t.Skip("SetSchemaValidationEnforced API not available on this Pulsar version")
+ return
+ }
+ assert.NoError(t, err)
+ }
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ schemaValidationEnforced, err := admin.Topics().GetSchemaValidationEnforced(*topicName)
+ return err == nil && schemaValidationEnforced == true
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove schema validation enforced policy
+ err = admin.Topics().RemoveSchemaValidationEnforced(*topicName)
+ if err != nil {
+ // Skip removal check if API is not available
+ if strings.Contains(err.Error(), "405") || strings.Contains(err.Error(), "Method Not Allowed") {
+ t.Skip("RemoveSchemaValidationEnforced API not available on this Pulsar version")
+ return
+ }
+ assert.NoError(t, err)
+ }
+ assert.Eventually(
+ t,
+ func() bool {
+ schemaValidationEnforced, err := admin.Topics().GetSchemaValidationEnforced(*topicName)
+ return err == nil && schemaValidationEnforced == initialValidationEnforced
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestDeduplicationSnapshotInterval(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default deduplication snapshot interval
+ interval, err := admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, interval)
+
+ // Set new deduplication snapshot interval
+ err = admin.Topics().SetDeduplicationSnapshotInterval(*topicName, 1000)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ interval, err = admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
+ return err == nil && interval == 1000
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove deduplication snapshot interval policy
+ err = admin.Topics().RemoveDeduplicationSnapshotInterval(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ interval, err = admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
+ return err == nil && interval == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestReplicatorDispatchRate(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default replicator dispatch rate (adapt to actual server behavior)
+ initialDispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName)
+ assert.NoError(t, err)
+ // Store initial values for later comparison instead of assuming specific defaults
+ initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
+ initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
+ initialRatePeriod := initialDispatchRate.RatePeriodInSecond
+ initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
+
+ // Set new replicator dispatch rate
+ newDispatchRate := utils.DispatchRateData{
+ DispatchThrottlingRateInMsg: 500,
+ DispatchThrottlingRateInByte: 524288, // 512KB
+ RatePeriodInSecond: 60,
+ RelativeToPublishRate: true,
+ }
+ err = admin.Topics().SetReplicatorDispatchRate(*topicName, newDispatchRate)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ dispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName)
+ return err == nil &&
+ dispatchRate.DispatchThrottlingRateInMsg == 500 &&
+ dispatchRate.DispatchThrottlingRateInByte == 524288 &&
+ dispatchRate.RatePeriodInSecond == 60 &&
+ dispatchRate.RelativeToPublishRate == true
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove replicator dispatch rate policy
+ err = admin.Topics().RemoveReplicatorDispatchRate(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ dispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName)
+ return err == nil &&
+ dispatchRate.DispatchThrottlingRateInMsg == initialMsgRate &&
+ dispatchRate.DispatchThrottlingRateInByte == initialByteRate &&
+ dispatchRate.RatePeriodInSecond == initialRatePeriod &&
+ dispatchRate.RelativeToPublishRate == initialRelativeToPublish
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestOffloadPolicies(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default offload policies
+ offloadPolicies, err := admin.Topics().GetOffloadPolicies(*topicName)
+ assert.NoError(t, err)
+ // Default values should be empty/default
+ assert.Equal(t, "", offloadPolicies.ManagedLedgerOffloadDriver)
+ assert.Equal(t, 0, offloadPolicies.ManagedLedgerOffloadMaxThreads)
+
+ // Set new offload policies
+ newOffloadPolicies := utils.OffloadPolicies{
+ ManagedLedgerOffloadDriver: "aws-s3",
+ ManagedLedgerOffloadMaxThreads: 4,
+ ManagedLedgerOffloadThresholdInBytes: 1073741824, // 1GB
+ ManagedLedgerOffloadDeletionLagInMillis: 3600000, // 1 hour
+ ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: 2147483648, // 2GB
+ S3ManagedLedgerOffloadBucket: "test-bucket",
+ S3ManagedLedgerOffloadRegion: "us-west-2",
+ S3ManagedLedgerOffloadServiceEndpoint: "https://s3.amazonaws.com",
+ }
+ err = admin.Topics().SetOffloadPolicies(*topicName, newOffloadPolicies)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ offloadPolicies, err = admin.Topics().GetOffloadPolicies(*topicName)
+ return err == nil &&
+ offloadPolicies.ManagedLedgerOffloadDriver == "aws-s3" &&
+ offloadPolicies.ManagedLedgerOffloadMaxThreads == 4 &&
+ offloadPolicies.ManagedLedgerOffloadThresholdInBytes == 1073741824 &&
+ offloadPolicies.S3ManagedLedgerOffloadBucket == "test-bucket"
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove offload policies
+ err = admin.Topics().RemoveOffloadPolicies(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ offloadPolicies, err = admin.Topics().GetOffloadPolicies(*topicName)
+ return err == nil &&
+ offloadPolicies.ManagedLedgerOffloadDriver == "" &&
+ offloadPolicies.ManagedLedgerOffloadMaxThreads == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestAutoSubscriptionCreation(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default auto subscription creation
+ autoSubCreation, err := admin.Topics().GetAutoSubscriptionCreation(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, false, autoSubCreation.AllowAutoSubscriptionCreation)
+
+ // Set auto subscription creation to true
+ newAutoSubCreation := utils.AutoSubscriptionCreationOverride{
+ AllowAutoSubscriptionCreation: true,
+ }
+ err = admin.Topics().SetAutoSubscriptionCreation(*topicName, newAutoSubCreation)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ autoSubCreation, err = admin.Topics().GetAutoSubscriptionCreation(*topicName)
+ return err == nil &&
+ autoSubCreation.AllowAutoSubscriptionCreation == true
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove auto subscription creation policy
+ err = admin.Topics().RemoveAutoSubscriptionCreation(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ autoSubCreation, err = admin.Topics().GetAutoSubscriptionCreation(*topicName)
+ return err == nil &&
+ autoSubCreation.AllowAutoSubscriptionCreation == false
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestSchemaCompatibilityStrategy(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default schema compatibility strategy (adapt to actual server behavior)
+ initialStrategy, err := admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+ assert.NoError(t, err)
+ // Server may return empty string instead of "UNDEFINED"
+
+ // Set new schema compatibility strategy
+ err = admin.Topics().SetSchemaCompatibilityStrategy(*topicName, utils.SchemaCompatibilityStrategyBackward)
+ assert.NoError(t, err)
+
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ strategy, err := admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+ return err == nil &&
+ strategy == utils.SchemaCompatibilityStrategyBackward
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove schema compatibility strategy policy
+ err = admin.Topics().RemoveSchemaCompatibilityStrategy(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ strategy, err := admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+ return err == nil &&
+ strategy == initialStrategy
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
diff --git a/pulsaradmin/pkg/utils/auto_subscription_creation.go b/pulsaradmin/pkg/utils/auto_subscription_creation.go
new file mode 100644
index 0000000..7f3b9da
--- /dev/null
+++ b/pulsaradmin/pkg/utils/auto_subscription_creation.go
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+type AutoSubscriptionCreationOverride struct {
+ AllowAutoSubscriptionCreation bool `json:"allowAutoSubscriptionCreation"`
+}
+
+func NewAutoSubscriptionCreationOverride() *AutoSubscriptionCreationOverride {
+ return &AutoSubscriptionCreationOverride{
+ AllowAutoSubscriptionCreation: false,
+ }
+}
diff --git a/pulsaradmin/pkg/utils/auto_subscription_creation_test.go b/pulsaradmin/pkg/utils/auto_subscription_creation_test.go
new file mode 100644
index 0000000..5a83a6d
--- /dev/null
+++ b/pulsaradmin/pkg/utils/auto_subscription_creation_test.go
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewAutoSubscriptionCreationOverride(t *testing.T) {
+ // Test that constructor returns non-nil pointer
+ override := NewAutoSubscriptionCreationOverride()
+ assert.NotNil(t, override)
+
+ // Test that default value is set correctly
+ assert.Equal(t, false, override.AllowAutoSubscriptionCreation)
+
+ // Test that it returns pointer to struct
+ assert.IsType(t, &AutoSubscriptionCreationOverride{}, override)
+}
+
+func TestAutoSubscriptionCreationOverride_JSONSerialization(t *testing.T) {
+ tests := []struct {
+ name string
+ override AutoSubscriptionCreationOverride
+ expected string
+ }{
+ {
+ name: "False value serialization",
+ override: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: false},
+ expected: `{"allowAutoSubscriptionCreation":false}`,
+ },
+ {
+ name: "True value serialization",
+ override: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: true},
+ expected: `{"allowAutoSubscriptionCreation":true}`,
+ },
+ {
+ name: "Default constructor serialization",
+ override: *NewAutoSubscriptionCreationOverride(),
+ expected: `{"allowAutoSubscriptionCreation":false}`,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ jsonData, err := json.Marshal(tt.override)
+ assert.NoError(t, err)
+ assert.Equal(t, tt.expected, string(jsonData))
+ })
+ }
+}
+
+func TestAutoSubscriptionCreationOverride_JSONDeserialization(t *testing.T) {
+ tests := []struct {
+ name string
+ jsonData string
+ expected AutoSubscriptionCreationOverride
+ wantErr bool
+ }{
+ {
+ name: "False value deserialization",
+ jsonData: `{"allowAutoSubscriptionCreation":false}`,
+ expected: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: false},
+ wantErr: false,
+ },
+ {
+ name: "True value deserialization",
+ jsonData: `{"allowAutoSubscriptionCreation":true}`,
+ expected: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: true},
+ wantErr: false,
+ },
+ {
+ name: "Empty JSON object",
+ jsonData: `{}`,
+ expected: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: false},
+ wantErr: false,
+ },
+ {
+ name: "Extra fields ignored",
+ jsonData: `{"allowAutoSubscriptionCreation":true,"extraField":"ignored"}`,
+ expected: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: true},
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var override AutoSubscriptionCreationOverride
+ err := json.Unmarshal([]byte(tt.jsonData), &override)
+
+ if tt.wantErr {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, tt.expected, override)
+ }
+ })
+ }
+}
+
+func TestAutoSubscriptionCreationOverride_FieldModification(t *testing.T) {
+ // Test field modification on constructor-created instance
+ override := NewAutoSubscriptionCreationOverride()
+
+ // Initial state should be false
+ assert.Equal(t, false, override.AllowAutoSubscriptionCreation)
+
+ // Modify to true
+ override.AllowAutoSubscriptionCreation = true
+ assert.Equal(t, true, override.AllowAutoSubscriptionCreation)
+
+ // Modify back to false
+ override.AllowAutoSubscriptionCreation = false
+ assert.Equal(t, false, override.AllowAutoSubscriptionCreation)
+
+ // Test field modification on manually created instance
+ manual := &AutoSubscriptionCreationOverride{
+ AllowAutoSubscriptionCreation: true,
+ }
+ assert.Equal(t, true, manual.AllowAutoSubscriptionCreation)
+
+ manual.AllowAutoSubscriptionCreation = false
+ assert.Equal(t, false, manual.AllowAutoSubscriptionCreation)
+}
+
+func TestAutoSubscriptionCreationOverride_InvalidJSON(t *testing.T) {
+ tests := []struct {
+ name string
+ jsonData string
+ }{
+ {
+ name: "Invalid JSON syntax",
+ jsonData: `{"allowAutoSubscriptionCreation":false`,
+ },
+ {
+ name: "Invalid boolean value",
+ jsonData: `{"allowAutoSubscriptionCreation":"invalid"}`,
+ },
+ {
+ name: "Invalid JSON structure",
+ jsonData: `[{"allowAutoSubscriptionCreation":true}]`,
+ },
+ {
+ name: "Completely invalid JSON",
+ jsonData: `invalid json`,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var override AutoSubscriptionCreationOverride
+ err := json.Unmarshal([]byte(tt.jsonData), &override)
+ assert.Error(t, err)
+ })
+ }
+}
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index 58673dc..441aa33 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -486,6 +486,27 @@
type DelayedDeliveryData struct {
TickTime float64 `json:"tickTime"`
Active bool `json:"active"`
+ // MaxDelayInMillis is optional and was added for enhanced delayed delivery support
+ // Default value 0 means no maximum delay limit (backward compatible)
+ MaxDelayInMillis int64 `json:"maxDelayInMillis,omitempty"`
+}
+
+// NewDelayedDeliveryData creates a DelayedDeliveryData with backward compatible defaults
+func NewDelayedDeliveryData(tickTime float64, active bool) *DelayedDeliveryData {
+ return &DelayedDeliveryData{
+ TickTime: tickTime,
+ Active: active,
+ // MaxDelayInMillis is left as 0 (no limit) for backward compatibility
+ }
+}
+
+// NewDelayedDeliveryDataWithMaxDelay creates a DelayedDeliveryData with max delay limit
+func NewDelayedDeliveryDataWithMaxDelay(tickTime float64, active bool, maxDelayMs int64) *DelayedDeliveryData {
+ return &DelayedDeliveryData{
+ TickTime: tickTime,
+ Active: active,
+ MaxDelayInMillis: maxDelayMs,
+ }
}
type DispatchRateData struct {
diff --git a/pulsaradmin/pkg/utils/offload_policies.go b/pulsaradmin/pkg/utils/offload_policies.go
new file mode 100644
index 0000000..f39ddad
--- /dev/null
+++ b/pulsaradmin/pkg/utils/offload_policies.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+//nolint:lll
+type OffloadPolicies struct {
+ ManagedLedgerOffloadDriver string `json:"managedLedgerOffloadDriver,omitempty"`
+ ManagedLedgerOffloadMaxThreads int `json:"managedLedgerOffloadMaxThreads,omitempty"`
+ ManagedLedgerOffloadThresholdInBytes int64 `json:"managedLedgerOffloadThresholdInBytes,omitempty"`
+ ManagedLedgerOffloadDeletionLagInMillis int64 `json:"managedLedgerOffloadDeletionLagInMillis,omitempty"`
+ ManagedLedgerOffloadAutoTriggerSizeThresholdBytes int64 `json:"managedLedgerOffloadAutoTriggerSizeThresholdBytes,omitempty"`
+ S3ManagedLedgerOffloadBucket string `json:"s3ManagedLedgerOffloadBucket,omitempty"`
+ S3ManagedLedgerOffloadRegion string `json:"s3ManagedLedgerOffloadRegion,omitempty"`
+ S3ManagedLedgerOffloadServiceEndpoint string `json:"s3ManagedLedgerOffloadServiceEndpoint,omitempty"`
+ S3ManagedLedgerOffloadCredentialID string `json:"s3ManagedLedgerOffloadCredentialId,omitempty"`
+ S3ManagedLedgerOffloadCredentialSecret string `json:"s3ManagedLedgerOffloadCredentialSecret,omitempty"`
+ S3ManagedLedgerOffloadRole string `json:"s3ManagedLedgerOffloadRole,omitempty"`
+ S3ManagedLedgerOffloadRoleSessionName string `json:"s3ManagedLedgerOffloadRoleSessionName,omitempty"`
+ OffloadersDirectory string `json:"offloadersDirectory,omitempty"`
+ ManagedLedgerOffloadDriverMetadata map[string]string `json:"managedLedgerOffloadDriverMetadata,omitempty"`
+}
+
+func NewOffloadPolicies() *OffloadPolicies {
+ return &OffloadPolicies{
+ ManagedLedgerOffloadMaxThreads: 2,
+ ManagedLedgerOffloadThresholdInBytes: -1,
+ ManagedLedgerOffloadDeletionLagInMillis: 14400000, // 4 hours
+ ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: -1,
+ ManagedLedgerOffloadDriverMetadata: make(map[string]string),
+ }
+}
diff --git a/pulsaradmin/pkg/utils/offload_policies_test.go b/pulsaradmin/pkg/utils/offload_policies_test.go
new file mode 100644
index 0000000..c142d7b
--- /dev/null
+++ b/pulsaradmin/pkg/utils/offload_policies_test.go
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewOffloadPolicies(t *testing.T) {
+ // Test that constructor returns non-nil pointer
+ policies := NewOffloadPolicies()
+ assert.NotNil(t, policies)
+
+ // Test that default values are set correctly
+ assert.Equal(t, 2, policies.ManagedLedgerOffloadMaxThreads)
+ assert.Equal(t, int64(-1), policies.ManagedLedgerOffloadThresholdInBytes)
+ assert.Equal(t, int64(14400000), policies.ManagedLedgerOffloadDeletionLagInMillis) // 4 hours
+ assert.Equal(t, int64(-1), policies.ManagedLedgerOffloadAutoTriggerSizeThresholdBytes)
+
+ // Test that map is initialized and empty
+ assert.NotNil(t, policies.ManagedLedgerOffloadDriverMetadata)
+ assert.Equal(t, 0, len(policies.ManagedLedgerOffloadDriverMetadata))
+
+ // Test that other fields are zero values
+ assert.Equal(t, "", policies.ManagedLedgerOffloadDriver)
+ assert.Equal(t, "", policies.S3ManagedLedgerOffloadBucket)
+ assert.Equal(t, "", policies.S3ManagedLedgerOffloadRegion)
+ assert.Equal(t, "", policies.S3ManagedLedgerOffloadServiceEndpoint)
+ assert.Equal(t, "", policies.S3ManagedLedgerOffloadCredentialID)
+ assert.Equal(t, "", policies.S3ManagedLedgerOffloadCredentialSecret)
+ assert.Equal(t, "", policies.S3ManagedLedgerOffloadRole)
+ assert.Equal(t, "", policies.S3ManagedLedgerOffloadRoleSessionName)
+ assert.Equal(t, "", policies.OffloadersDirectory)
+
+ // Test that it returns pointer to struct
+ assert.IsType(t, &OffloadPolicies{}, policies)
+}
+
+func TestOffloadPolicies_JSONSerialization(t *testing.T) {
+ tests := []struct {
+ name string
+ policies OffloadPolicies
+ expected string
+ }{
+ //nolint:lll
+ {
+ name: "Default constructor serialization (omitempty behavior)",
+ policies: *NewOffloadPolicies(),
+ expected: `{"managedLedgerOffloadMaxThreads":2,"managedLedgerOffloadThresholdInBytes":-1,"managedLedgerOffloadDeletionLagInMillis":14400000,"managedLedgerOffloadAutoTriggerSizeThresholdBytes":-1}`,
+ },
+ //nolint:lll
+ {
+ name: "Full object serialization",
+ policies: OffloadPolicies{
+ ManagedLedgerOffloadDriver: "s3",
+ ManagedLedgerOffloadMaxThreads: 4,
+ ManagedLedgerOffloadThresholdInBytes: 1000000,
+ ManagedLedgerOffloadDeletionLagInMillis: 7200000,
+ ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: 500000,
+ S3ManagedLedgerOffloadBucket: "test-bucket",
+ S3ManagedLedgerOffloadRegion: "us-west-2",
+ S3ManagedLedgerOffloadServiceEndpoint: "https://s3.us-west-2.amazonaws.com",
+ S3ManagedLedgerOffloadCredentialID: "access-key",
+ S3ManagedLedgerOffloadCredentialSecret: "secret-key",
+ S3ManagedLedgerOffloadRole: "test-role",
+ S3ManagedLedgerOffloadRoleSessionName: "test-session",
+ OffloadersDirectory: "/opt/offloaders",
+ ManagedLedgerOffloadDriverMetadata: map[string]string{"key1": "value1", "key2": "value2"},
+ },
+ expected: `{"managedLedgerOffloadDriver":"s3","managedLedgerOffloadMaxThreads":4,"managedLedgerOffloadThresholdInBytes":1000000,"managedLedgerOffloadDeletionLagInMillis":7200000,"managedLedgerOffloadAutoTriggerSizeThresholdBytes":500000,"s3ManagedLedgerOffloadBucket":"test-bucket","s3ManagedLedgerOffloadRegion":"us-west-2","s3ManagedLedgerOffloadServiceEndpoint":"https://s3.us-west-2.amazonaws.com","s3ManagedLedgerOffloadCredentialId":"access-key","s3ManagedLedgerOffloadCredentialSecret":"secret-key","s3ManagedLedgerOffloadRole":"test-role","s3ManagedLedgerOffloadRoleSessionName":"test-session","offloadersDirectory":"/opt/offloaders","managedLedgerOffloadDriverMetadata":{"key1":"value1","key2":"value2"}}`,
+ },
+ {
+ name: "Empty struct serialization (zero values omitted)",
+ policies: OffloadPolicies{},
+ expected: `{}`,
+ },
+ {
+ name: "Non-empty map serialization",
+ policies: OffloadPolicies{
+ ManagedLedgerOffloadDriverMetadata: map[string]string{"key": "value"},
+ },
+ expected: `{"managedLedgerOffloadDriverMetadata":{"key":"value"}}`,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ jsonData, err := json.Marshal(tt.policies)
+ assert.NoError(t, err)
+ assert.Equal(t, tt.expected, string(jsonData))
+ })
+ }
+}
+
+func TestOffloadPolicies_JSONDeserialization(t *testing.T) {
+ tests := []struct {
+ name string
+ jsonData string
+ expected OffloadPolicies
+ wantErr bool
+ }{
+ //nolint:lll
+ {
+ name: "Complete JSON deserialization",
+ jsonData: `{"managedLedgerOffloadDriver":"s3","managedLedgerOffloadMaxThreads":4,"managedLedgerOffloadThresholdInBytes":1000000,"managedLedgerOffloadDeletionLagInMillis":7200000,"managedLedgerOffloadAutoTriggerSizeThresholdBytes":500000,"s3ManagedLedgerOffloadBucket":"test-bucket","s3ManagedLedgerOffloadRegion":"us-west-2","s3ManagedLedgerOffloadServiceEndpoint":"https://s3.us-west-2.amazonaws.com","s3ManagedLedgerOffloadCredentialId":"access-key","s3ManagedLedgerOffloadCredentialSecret":"secret-key","s3ManagedLedgerOffloadRole":"test-role","s3ManagedLedgerOffloadRoleSessionName":"test-session","offloadersDirectory":"/opt/offloaders","managedLedgerOffloadDriverMetadata":{"key1":"value1","key2":"value2"}}`,
+ expected: OffloadPolicies{
+ ManagedLedgerOffloadDriver: "s3",
+ ManagedLedgerOffloadMaxThreads: 4,
+ ManagedLedgerOffloadThresholdInBytes: 1000000,
+ ManagedLedgerOffloadDeletionLagInMillis: 7200000,
+ ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: 500000,
+ S3ManagedLedgerOffloadBucket: "test-bucket",
+ S3ManagedLedgerOffloadRegion: "us-west-2",
+ S3ManagedLedgerOffloadServiceEndpoint: "https://s3.us-west-2.amazonaws.com",
+ S3ManagedLedgerOffloadCredentialID: "access-key",
+ S3ManagedLedgerOffloadCredentialSecret: "secret-key",
+ S3ManagedLedgerOffloadRole: "test-role",
+ S3ManagedLedgerOffloadRoleSessionName: "test-session",
+ OffloadersDirectory: "/opt/offloaders",
+ ManagedLedgerOffloadDriverMetadata: map[string]string{"key1": "value1", "key2": "value2"},
+ },
+ wantErr: false,
+ },
+ //nolint:lll
+ {
+ name: "Partial JSON deserialization",
+ jsonData: `{"managedLedgerOffloadDriver":"s3","managedLedgerOffloadMaxThreads":4,"s3ManagedLedgerOffloadBucket":"test-bucket"}`,
+ expected: OffloadPolicies{
+ ManagedLedgerOffloadDriver: "s3",
+ ManagedLedgerOffloadMaxThreads: 4,
+ S3ManagedLedgerOffloadBucket: "test-bucket",
+ // Other fields should be zero values
+ },
+ wantErr: false,
+ },
+ {
+ name: "Empty JSON object",
+ jsonData: `{}`,
+ expected: OffloadPolicies{
+ // All fields should be zero values
+ },
+ wantErr: false,
+ },
+ {
+ name: "JSON with metadata map",
+ jsonData: `{"managedLedgerOffloadDriverMetadata":{"env":"prod","version":"1.0"}}`,
+ expected: OffloadPolicies{
+ ManagedLedgerOffloadDriverMetadata: map[string]string{"env": "prod", "version": "1.0"},
+ },
+ wantErr: false,
+ },
+ {
+ name: "JSON with extra fields ignored",
+ jsonData: `{"managedLedgerOffloadDriver":"s3","extraField":"ignored","managedLedgerOffloadMaxThreads":4}`,
+ expected: OffloadPolicies{
+ ManagedLedgerOffloadDriver: "s3",
+ ManagedLedgerOffloadMaxThreads: 4,
+ },
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var policies OffloadPolicies
+ err := json.Unmarshal([]byte(tt.jsonData), &policies)
+
+ if tt.wantErr {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, tt.expected, policies)
+ }
+ })
+ }
+}
+
+func TestOffloadPolicies_FieldModification(t *testing.T) {
+ // Test field modification on constructor-created instance
+ policies := NewOffloadPolicies()
+
+ // Test modifying string fields
+ policies.ManagedLedgerOffloadDriver = "s3"
+ assert.Equal(t, "s3", policies.ManagedLedgerOffloadDriver)
+
+ policies.S3ManagedLedgerOffloadBucket = "test-bucket"
+ assert.Equal(t, "test-bucket", policies.S3ManagedLedgerOffloadBucket)
+
+ // Test modifying int field
+ policies.ManagedLedgerOffloadMaxThreads = 8
+ assert.Equal(t, 8, policies.ManagedLedgerOffloadMaxThreads)
+
+ // Test modifying int64 fields
+ policies.ManagedLedgerOffloadThresholdInBytes = 5000000
+ assert.Equal(t, int64(5000000), policies.ManagedLedgerOffloadThresholdInBytes)
+
+ policies.ManagedLedgerOffloadDeletionLagInMillis = 3600000 // 1 hour
+ assert.Equal(t, int64(3600000), policies.ManagedLedgerOffloadDeletionLagInMillis)
+
+ // Test field modification on manually created instance
+ manual := &OffloadPolicies{
+ ManagedLedgerOffloadDriver: "gcs",
+ ManagedLedgerOffloadMaxThreads: 4,
+ S3ManagedLedgerOffloadBucket: "initial-bucket",
+ }
+
+ assert.Equal(t, "gcs", manual.ManagedLedgerOffloadDriver)
+ assert.Equal(t, 4, manual.ManagedLedgerOffloadMaxThreads)
+ assert.Equal(t, "initial-bucket", manual.S3ManagedLedgerOffloadBucket)
+
+ // Modify the manually created instance
+ manual.ManagedLedgerOffloadDriver = "azure"
+ manual.ManagedLedgerOffloadMaxThreads = 6
+
+ assert.Equal(t, "azure", manual.ManagedLedgerOffloadDriver)
+ assert.Equal(t, 6, manual.ManagedLedgerOffloadMaxThreads)
+ assert.Equal(t, "initial-bucket", manual.S3ManagedLedgerOffloadBucket)
+}
+
+func TestOffloadPolicies_MapOperations(t *testing.T) {
+ policies := NewOffloadPolicies()
+
+ // Test initial state of map
+ assert.NotNil(t, policies.ManagedLedgerOffloadDriverMetadata)
+ assert.Equal(t, 0, len(policies.ManagedLedgerOffloadDriverMetadata))
+
+ // Test adding entries to map
+ policies.ManagedLedgerOffloadDriverMetadata["key1"] = "value1"
+ policies.ManagedLedgerOffloadDriverMetadata["key2"] = "value2"
+
+ assert.Equal(t, 2, len(policies.ManagedLedgerOffloadDriverMetadata))
+ assert.Equal(t, "value1", policies.ManagedLedgerOffloadDriverMetadata["key1"])
+ assert.Equal(t, "value2", policies.ManagedLedgerOffloadDriverMetadata["key2"])
+
+ // Test modifying existing entry
+ policies.ManagedLedgerOffloadDriverMetadata["key1"] = "modified_value1"
+ assert.Equal(t, "modified_value1", policies.ManagedLedgerOffloadDriverMetadata["key1"])
+
+ // Test deleting entry
+ delete(policies.ManagedLedgerOffloadDriverMetadata, "key2")
+ assert.Equal(t, 1, len(policies.ManagedLedgerOffloadDriverMetadata))
+ _, exists := policies.ManagedLedgerOffloadDriverMetadata["key2"]
+ assert.False(t, exists)
+
+ // Test with nil map (should not panic but won't work as expected)
+ nilMapPolicies := &OffloadPolicies{}
+ assert.Nil(t, nilMapPolicies.ManagedLedgerOffloadDriverMetadata)
+
+ // Initialize the map manually
+ nilMapPolicies.ManagedLedgerOffloadDriverMetadata = make(map[string]string)
+ nilMapPolicies.ManagedLedgerOffloadDriverMetadata["test"] = "value"
+ assert.Equal(t, "value", nilMapPolicies.ManagedLedgerOffloadDriverMetadata["test"])
+}
+
+func TestOffloadPolicies_InvalidJSON(t *testing.T) {
+ tests := []struct {
+ name string
+ jsonData string
+ }{
+ {
+ name: "Invalid JSON syntax",
+ jsonData: `{"managedLedgerOffloadDriver":"s3"`,
+ },
+ {
+ name: "Invalid int value",
+ jsonData: `{"managedLedgerOffloadMaxThreads":"invalid"}`,
+ },
+ {
+ name: "Invalid int64 value",
+ jsonData: `{"managedLedgerOffloadThresholdInBytes":"invalid"}`,
+ },
+ {
+ name: "Invalid JSON structure",
+ jsonData: `[{"managedLedgerOffloadDriver":"s3"}]`,
+ },
+ {
+ name: "Invalid map structure",
+ jsonData: `{"managedLedgerOffloadDriverMetadata":"invalid"}`,
+ },
+ {
+ name: "Completely invalid JSON",
+ jsonData: `invalid json`,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var policies OffloadPolicies
+ err := json.Unmarshal([]byte(tt.jsonData), &policies)
+ assert.Error(t, err)
+ })
+ }
+}