feat: align topics level policies admin apis to java restful apis (#1398)

* feat: add topic configuration methods for subscribe rate, dispatch rate, max consumers, message size, subscriptions, schema validation, deduplication, replicator dispatch rate, offload policies, auto subscription creation, and schema compatibility strategy

* fix: lint error

* fix: lint

* add tests

* fix tests

* add unit tests
diff --git a/integration-tests/blue-green/docker-compose.yml b/integration-tests/blue-green/docker-compose.yml
index f04e24f..ae5de47 100644
--- a/integration-tests/blue-green/docker-compose.yml
+++ b/integration-tests/blue-green/docker-compose.yml
@@ -130,6 +130,7 @@
       - loadBalancerDebugModeEnabled=true
       - brokerServiceCompactionThresholdInBytes=1000000
       - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+      - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
     depends_on:
       green-zookeeper:
         condition: service_healthy
@@ -163,6 +164,7 @@
       - loadBalancerDebugModeEnabled=true
       - brokerServiceCompactionThresholdInBytes=1000000
       - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+      - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
     depends_on:
       green-zookeeper:
         condition: service_healthy
diff --git a/integration-tests/clustered/docker-compose.yml b/integration-tests/clustered/docker-compose.yml
index eb96ca9..8eb997e 100644
--- a/integration-tests/clustered/docker-compose.yml
+++ b/integration-tests/clustered/docker-compose.yml
@@ -122,6 +122,7 @@
       - advertisedListeners=internal:pulsar://broker-1:6650
       - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
       - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+      - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
     depends_on:
       zookeeper:
         condition: service_healthy
@@ -154,6 +155,7 @@
       - advertisedListeners=internal:pulsar://broker-2:6650
       - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
       - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+      - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
     depends_on:
       zookeeper:
         condition: service_healthy
diff --git a/integration-tests/extensible-load-manager/docker-compose.yml b/integration-tests/extensible-load-manager/docker-compose.yml
index 063a235..d2a88fb 100644
--- a/integration-tests/extensible-load-manager/docker-compose.yml
+++ b/integration-tests/extensible-load-manager/docker-compose.yml
@@ -129,6 +129,7 @@
       - clusterMigrationCheckDurationSeconds=1
       - brokerServiceCompactionThresholdInBytes=1000000
       - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+      - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
     depends_on:
       zookeeper:
         condition: service_healthy
@@ -168,6 +169,7 @@
       - clusterMigrationCheckDurationSeconds=1
       - brokerServiceCompactionThresholdInBytes=1000000
       - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+      - PULSAR_PREFIX_topicLevelPoliciesEnabled=true
     depends_on:
       zookeeper:
         condition: service_healthy
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index c7fe222..4d6fff1 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -389,6 +389,107 @@
 	// @param data
 	//        list of replication cluster id
 	SetReplicationClusters(topic utils.TopicName, data []string) error
+
+	// GetSubscribeRate Get subscribe rate configuration for a topic
+	GetSubscribeRate(utils.TopicName) (*utils.SubscribeRate, error)
+
+	// SetSubscribeRate Set subscribe rate configuration for a topic
+	SetSubscribeRate(utils.TopicName, utils.SubscribeRate) error
+
+	// RemoveSubscribeRate Remove subscribe rate configuration for a topic
+	RemoveSubscribeRate(utils.TopicName) error
+
+	// GetSubscriptionDispatchRate Get subscription dispatch rate for a topic
+	GetSubscriptionDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
+
+	// SetSubscriptionDispatchRate Set subscription dispatch rate for a topic
+	SetSubscriptionDispatchRate(utils.TopicName, utils.DispatchRateData) error
+
+	// RemoveSubscriptionDispatchRate Remove subscription dispatch rate for a topic
+	RemoveSubscriptionDispatchRate(utils.TopicName) error
+
+	// GetMaxConsumersPerSubscription Get max consumers per subscription for a topic
+	GetMaxConsumersPerSubscription(utils.TopicName) (int, error)
+
+	// SetMaxConsumersPerSubscription Set max consumers per subscription for a topic
+	SetMaxConsumersPerSubscription(utils.TopicName, int) error
+
+	// RemoveMaxConsumersPerSubscription Remove max consumers per subscription for a topic
+	RemoveMaxConsumersPerSubscription(utils.TopicName) error
+
+	// GetMaxMessageSize Get max message size for a topic
+	GetMaxMessageSize(utils.TopicName) (int, error)
+
+	// SetMaxMessageSize Set max message size for a topic
+	SetMaxMessageSize(utils.TopicName, int) error
+
+	// RemoveMaxMessageSize Remove max message size for a topic
+	RemoveMaxMessageSize(utils.TopicName) error
+
+	// GetMaxSubscriptionsPerTopic Get max subscriptions per topic
+	GetMaxSubscriptionsPerTopic(utils.TopicName) (int, error)
+
+	// SetMaxSubscriptionsPerTopic Set max subscriptions per topic
+	SetMaxSubscriptionsPerTopic(utils.TopicName, int) error
+
+	// RemoveMaxSubscriptionsPerTopic Remove max subscriptions per topic
+	RemoveMaxSubscriptionsPerTopic(utils.TopicName) error
+
+	// GetSchemaValidationEnforced Get schema validation enforced flag for a topic
+	GetSchemaValidationEnforced(utils.TopicName) (bool, error)
+
+	// SetSchemaValidationEnforced Set schema validation enforced flag for a topic
+	SetSchemaValidationEnforced(utils.TopicName, bool) error
+
+	// RemoveSchemaValidationEnforced Remove schema validation enforced flag for a topic
+	RemoveSchemaValidationEnforced(utils.TopicName) error
+
+	// GetDeduplicationSnapshotInterval Get deduplication snapshot interval for a topic
+	GetDeduplicationSnapshotInterval(utils.TopicName) (int, error)
+
+	// SetDeduplicationSnapshotInterval Set deduplication snapshot interval for a topic
+	SetDeduplicationSnapshotInterval(utils.TopicName, int) error
+
+	// RemoveDeduplicationSnapshotInterval Remove deduplication snapshot interval for a topic
+	RemoveDeduplicationSnapshotInterval(utils.TopicName) error
+
+	// GetReplicatorDispatchRate Get replicator dispatch rate for a topic
+	GetReplicatorDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
+
+	// SetReplicatorDispatchRate Set replicator dispatch rate for a topic
+	SetReplicatorDispatchRate(utils.TopicName, utils.DispatchRateData) error
+
+	// RemoveReplicatorDispatchRate Remove replicator dispatch rate for a topic
+	RemoveReplicatorDispatchRate(utils.TopicName) error
+
+	// GetOffloadPolicies Get offload policies for a topic
+	GetOffloadPolicies(utils.TopicName) (*utils.OffloadPolicies, error)
+
+	// SetOffloadPolicies Set offload policies for a topic
+	SetOffloadPolicies(utils.TopicName, utils.OffloadPolicies) error
+
+	// RemoveOffloadPolicies Remove offload policies for a topic
+	RemoveOffloadPolicies(utils.TopicName) error
+
+	// GetAutoSubscriptionCreation Get auto subscription creation override for a topic
+	GetAutoSubscriptionCreation(utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error)
+
+	// SetAutoSubscriptionCreation Set auto subscription creation override for a topic
+	SetAutoSubscriptionCreation(utils.TopicName,
+		utils.AutoSubscriptionCreationOverride) error
+
+	// RemoveAutoSubscriptionCreation Remove auto subscription creation override for a topic
+	RemoveAutoSubscriptionCreation(utils.TopicName) error
+
+	// GetSchemaCompatibilityStrategy Get schema compatibility strategy for a topic
+	GetSchemaCompatibilityStrategy(utils.TopicName) (utils.SchemaCompatibilityStrategy, error)
+
+	// SetSchemaCompatibilityStrategy Set schema compatibility strategy for a topic
+	SetSchemaCompatibilityStrategy(utils.TopicName,
+		utils.SchemaCompatibilityStrategy) error
+
+	// RemoveSchemaCompatibilityStrategy Remove schema compatibility strategy for a topic
+	RemoveSchemaCompatibilityStrategy(utils.TopicName) error
 }
 
 type topics struct {
@@ -933,3 +1034,192 @@
 	err := t.pulsar.Client.Get(endpoint, &data)
 	return data, err
 }
+
+func (t *topics) GetSubscribeRate(topic utils.TopicName) (*utils.SubscribeRate, error) {
+	var subscribeRate utils.SubscribeRate
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
+	err := t.pulsar.Client.Get(endpoint, &subscribeRate)
+	return &subscribeRate, err
+}
+
+func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate utils.SubscribeRate) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
+	return t.pulsar.Client.Post(endpoint, &subscribeRate)
+}
+
+func (t *topics) RemoveSubscribeRate(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetSubscriptionDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
+	var dispatchRate utils.DispatchRateData
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
+	err := t.pulsar.Client.Get(endpoint, &dispatchRate)
+	return &dispatchRate, err
+}
+
+func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
+	return t.pulsar.Client.Post(endpoint, &dispatchRate)
+}
+
+func (t *topics) RemoveSubscriptionDispatchRate(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetMaxConsumersPerSubscription(topic utils.TopicName) (int, error) {
+	var maxConsumers int
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
+	err := t.pulsar.Client.Get(endpoint, &maxConsumers)
+	return maxConsumers, err
+}
+
+func (t *topics) SetMaxConsumersPerSubscription(topic utils.TopicName, maxConsumers int) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
+	return t.pulsar.Client.Post(endpoint, &maxConsumers)
+}
+
+func (t *topics) RemoveMaxConsumersPerSubscription(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetMaxMessageSize(topic utils.TopicName) (int, error) {
+	var maxMessageSize int
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
+	err := t.pulsar.Client.Get(endpoint, &maxMessageSize)
+	return maxMessageSize, err
+}
+
+func (t *topics) SetMaxMessageSize(topic utils.TopicName, maxMessageSize int) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
+	return t.pulsar.Client.Post(endpoint, &maxMessageSize)
+}
+
+func (t *topics) RemoveMaxMessageSize(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetMaxSubscriptionsPerTopic(topic utils.TopicName) (int, error) {
+	var maxSubscriptions int
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
+	err := t.pulsar.Client.Get(endpoint, &maxSubscriptions)
+	return maxSubscriptions, err
+}
+
+func (t *topics) SetMaxSubscriptionsPerTopic(topic utils.TopicName, maxSubscriptions int) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
+	return t.pulsar.Client.Post(endpoint, &maxSubscriptions)
+}
+
+func (t *topics) RemoveMaxSubscriptionsPerTopic(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetSchemaValidationEnforced(topic utils.TopicName) (bool, error) {
+	var enabled bool
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
+	err := t.pulsar.Client.Get(endpoint, &enabled)
+	return enabled, err
+}
+
+func (t *topics) SetSchemaValidationEnforced(topic utils.TopicName, enabled bool) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
+	return t.pulsar.Client.Post(endpoint, enabled)
+}
+
+func (t *topics) RemoveSchemaValidationEnforced(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetDeduplicationSnapshotInterval(topic utils.TopicName) (int, error) {
+	var interval int
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
+	err := t.pulsar.Client.Get(endpoint, &interval)
+	return interval, err
+}
+
+func (t *topics) SetDeduplicationSnapshotInterval(topic utils.TopicName, interval int) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
+	return t.pulsar.Client.Post(endpoint, &interval)
+}
+
+func (t *topics) RemoveDeduplicationSnapshotInterval(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetReplicatorDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
+	var dispatchRate utils.DispatchRateData
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
+	err := t.pulsar.Client.Get(endpoint, &dispatchRate)
+	return &dispatchRate, err
+}
+
+func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
+	return t.pulsar.Client.Post(endpoint, &dispatchRate)
+}
+
+func (t *topics) RemoveReplicatorDispatchRate(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetAutoSubscriptionCreation(topic utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error) {
+	var autoSubCreation utils.AutoSubscriptionCreationOverride
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
+	err := t.pulsar.Client.Get(endpoint, &autoSubCreation)
+	return &autoSubCreation, err
+}
+
+func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName,
+	autoSubCreation utils.AutoSubscriptionCreationOverride) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
+	return t.pulsar.Client.Post(endpoint, &autoSubCreation)
+}
+
+func (t *topics) RemoveAutoSubscriptionCreation(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetSchemaCompatibilityStrategy(topic utils.TopicName) (utils.SchemaCompatibilityStrategy, error) {
+	var strategy utils.SchemaCompatibilityStrategy
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
+	err := t.pulsar.Client.Get(endpoint, &strategy)
+	return strategy, err
+}
+
+func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
+	strategy utils.SchemaCompatibilityStrategy) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
+	return t.pulsar.Client.Put(endpoint, strategy)
+}
+
+func (t *topics) RemoveSchemaCompatibilityStrategy(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
+	return t.pulsar.Client.Delete(endpoint)
+}
+
+func (t *topics) GetOffloadPolicies(topic utils.TopicName) (*utils.OffloadPolicies, error) {
+	var offloadPolicies utils.OffloadPolicies
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
+	err := t.pulsar.Client.Get(endpoint, &offloadPolicies)
+	return &offloadPolicies, err
+}
+
+func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies utils.OffloadPolicies) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
+	return t.pulsar.Client.Post(endpoint, &offloadPolicies)
+}
+
+func (t *topics) RemoveOffloadPolicies(topic utils.TopicName) error {
+	endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
+	return t.pulsar.Client.Delete(endpoint)
+}
diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go
index be08b64..9abf802 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -21,6 +21,7 @@
 	"context"
 	"fmt"
 	"log"
+	"strings"
 	"testing"
 	"time"
 
@@ -551,3 +552,612 @@
 		100*time.Millisecond,
 	)
 }
+
+func TestSubscribeRate(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default subscribe rate (adapt to actual server behavior)
+	initialSubscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
+	assert.NoError(t, err)
+	// Store initial values for later comparison instead of assuming specific defaults
+	initialConsumerRate := initialSubscribeRate.SubscribeThrottlingRatePerConsumer
+	initialRatePeriod := initialSubscribeRate.RatePeriodInSecond
+
+	// Set new subscribe rate
+	newSubscribeRate := utils.SubscribeRate{
+		SubscribeThrottlingRatePerConsumer: 10,
+		RatePeriodInSecond:                 60,
+	}
+	err = admin.Topics().SetSubscribeRate(*topicName, newSubscribeRate)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			subscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
+			return err == nil &&
+				subscribeRate.SubscribeThrottlingRatePerConsumer == 10 &&
+				subscribeRate.RatePeriodInSecond == 60
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove subscribe rate policy
+	err = admin.Topics().RemoveSubscribeRate(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			subscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
+			return err == nil &&
+				subscribeRate.SubscribeThrottlingRatePerConsumer == initialConsumerRate &&
+				subscribeRate.RatePeriodInSecond == initialRatePeriod
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestSubscriptionDispatchRate(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default subscription dispatch rate (adapt to actual server behavior)
+	initialDispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName)
+	assert.NoError(t, err)
+	// Store initial values for later comparison instead of assuming specific defaults
+	initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
+	initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
+	initialRatePeriod := initialDispatchRate.RatePeriodInSecond
+	initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
+
+	// Set new subscription dispatch rate
+	newDispatchRate := utils.DispatchRateData{
+		DispatchThrottlingRateInMsg:  1000,
+		DispatchThrottlingRateInByte: 1048576, // 1MB
+		RatePeriodInSecond:           30,
+		RelativeToPublishRate:        true,
+	}
+	err = admin.Topics().SetSubscriptionDispatchRate(*topicName, newDispatchRate)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			dispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName)
+			return err == nil &&
+				dispatchRate.DispatchThrottlingRateInMsg == 1000 &&
+				dispatchRate.DispatchThrottlingRateInByte == 1048576 &&
+				dispatchRate.RatePeriodInSecond == 30 &&
+				dispatchRate.RelativeToPublishRate == true
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove subscription dispatch rate policy
+	err = admin.Topics().RemoveSubscriptionDispatchRate(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			dispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName)
+			return err == nil &&
+				dispatchRate.DispatchThrottlingRateInMsg == initialMsgRate &&
+				dispatchRate.DispatchThrottlingRateInByte == initialByteRate &&
+				dispatchRate.RatePeriodInSecond == initialRatePeriod &&
+				dispatchRate.RelativeToPublishRate == initialRelativeToPublish
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestMaxConsumersPerSubscription(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default max consumers per subscription
+	maxConsumers, err := admin.Topics().GetMaxConsumersPerSubscription(*topicName)
+	assert.NoError(t, err)
+	assert.Equal(t, 0, maxConsumers)
+
+	// Set new max consumers per subscription
+	err = admin.Topics().SetMaxConsumersPerSubscription(*topicName, 10)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			maxConsumers, err = admin.Topics().GetMaxConsumersPerSubscription(*topicName)
+			return err == nil && maxConsumers == 10
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove max consumers per subscription policy
+	err = admin.Topics().RemoveMaxConsumersPerSubscription(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			maxConsumers, err = admin.Topics().GetMaxConsumersPerSubscription(*topicName)
+			return err == nil && maxConsumers == 0
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestMaxMessageSize(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default max message size
+	maxMessageSize, err := admin.Topics().GetMaxMessageSize(*topicName)
+	assert.NoError(t, err)
+	assert.Equal(t, 0, maxMessageSize)
+
+	// Set new max message size (1MB)
+	err = admin.Topics().SetMaxMessageSize(*topicName, 1048576)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			maxMessageSize, err = admin.Topics().GetMaxMessageSize(*topicName)
+			return err == nil && maxMessageSize == 1048576
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove max message size policy
+	err = admin.Topics().RemoveMaxMessageSize(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			maxMessageSize, err = admin.Topics().GetMaxMessageSize(*topicName)
+			return err == nil && maxMessageSize == 0
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestMaxSubscriptionsPerTopic(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default max subscriptions per topic
+	maxSubscriptions, err := admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
+	assert.NoError(t, err)
+	assert.Equal(t, 0, maxSubscriptions)
+
+	// Set new max subscriptions per topic
+	err = admin.Topics().SetMaxSubscriptionsPerTopic(*topicName, 100)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			maxSubscriptions, err = admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
+			return err == nil && maxSubscriptions == 100
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove max subscriptions per topic policy
+	err = admin.Topics().RemoveMaxSubscriptionsPerTopic(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			maxSubscriptions, err = admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
+			return err == nil && maxSubscriptions == 0
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestSchemaValidationEnforced(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default schema validation enforced
+	schemaValidationEnforced, err := admin.Topics().GetSchemaValidationEnforced(*topicName)
+	if err != nil {
+		// Skip test if API is not available (e.g., 405 Method Not Allowed)
+		if strings.Contains(err.Error(), "405") || strings.Contains(err.Error(), "Method Not Allowed") {
+			t.Skip("SchemaValidationEnforced API not available on this Pulsar version")
+			return
+		}
+		assert.NoError(t, err)
+	}
+	initialValidationEnforced := schemaValidationEnforced
+
+	// Set schema validation enforced to true
+	err = admin.Topics().SetSchemaValidationEnforced(*topicName, true)
+	if err != nil {
+		// Skip test if API is not available
+		if strings.Contains(err.Error(), "405") || strings.Contains(err.Error(), "Method Not Allowed") {
+			t.Skip("SetSchemaValidationEnforced API not available on this Pulsar version")
+			return
+		}
+		assert.NoError(t, err)
+	}
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			schemaValidationEnforced, err := admin.Topics().GetSchemaValidationEnforced(*topicName)
+			return err == nil && schemaValidationEnforced == true
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove schema validation enforced policy
+	err = admin.Topics().RemoveSchemaValidationEnforced(*topicName)
+	if err != nil {
+		// Skip removal check if API is not available
+		if strings.Contains(err.Error(), "405") || strings.Contains(err.Error(), "Method Not Allowed") {
+			t.Skip("RemoveSchemaValidationEnforced API not available on this Pulsar version")
+			return
+		}
+		assert.NoError(t, err)
+	}
+	assert.Eventually(
+		t,
+		func() bool {
+			schemaValidationEnforced, err := admin.Topics().GetSchemaValidationEnforced(*topicName)
+			return err == nil && schemaValidationEnforced == initialValidationEnforced
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestDeduplicationSnapshotInterval(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default deduplication snapshot interval
+	interval, err := admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
+	assert.NoError(t, err)
+	assert.Equal(t, 0, interval)
+
+	// Set new deduplication snapshot interval
+	err = admin.Topics().SetDeduplicationSnapshotInterval(*topicName, 1000)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			interval, err = admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
+			return err == nil && interval == 1000
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove deduplication snapshot interval policy
+	err = admin.Topics().RemoveDeduplicationSnapshotInterval(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			interval, err = admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
+			return err == nil && interval == 0
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestReplicatorDispatchRate(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default replicator dispatch rate (adapt to actual server behavior)
+	initialDispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName)
+	assert.NoError(t, err)
+	// Store initial values for later comparison instead of assuming specific defaults
+	initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
+	initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
+	initialRatePeriod := initialDispatchRate.RatePeriodInSecond
+	initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
+
+	// Set new replicator dispatch rate
+	newDispatchRate := utils.DispatchRateData{
+		DispatchThrottlingRateInMsg:  500,
+		DispatchThrottlingRateInByte: 524288, // 512KB
+		RatePeriodInSecond:           60,
+		RelativeToPublishRate:        true,
+	}
+	err = admin.Topics().SetReplicatorDispatchRate(*topicName, newDispatchRate)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			dispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName)
+			return err == nil &&
+				dispatchRate.DispatchThrottlingRateInMsg == 500 &&
+				dispatchRate.DispatchThrottlingRateInByte == 524288 &&
+				dispatchRate.RatePeriodInSecond == 60 &&
+				dispatchRate.RelativeToPublishRate == true
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove replicator dispatch rate policy
+	err = admin.Topics().RemoveReplicatorDispatchRate(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			dispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName)
+			return err == nil &&
+				dispatchRate.DispatchThrottlingRateInMsg == initialMsgRate &&
+				dispatchRate.DispatchThrottlingRateInByte == initialByteRate &&
+				dispatchRate.RatePeriodInSecond == initialRatePeriod &&
+				dispatchRate.RelativeToPublishRate == initialRelativeToPublish
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestOffloadPolicies(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default offload policies
+	offloadPolicies, err := admin.Topics().GetOffloadPolicies(*topicName)
+	assert.NoError(t, err)
+	// Default values should be empty/default
+	assert.Equal(t, "", offloadPolicies.ManagedLedgerOffloadDriver)
+	assert.Equal(t, 0, offloadPolicies.ManagedLedgerOffloadMaxThreads)
+
+	// Set new offload policies
+	newOffloadPolicies := utils.OffloadPolicies{
+		ManagedLedgerOffloadDriver:                        "aws-s3",
+		ManagedLedgerOffloadMaxThreads:                    4,
+		ManagedLedgerOffloadThresholdInBytes:              1073741824, // 1GB
+		ManagedLedgerOffloadDeletionLagInMillis:           3600000,    // 1 hour
+		ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: 2147483648, // 2GB
+		S3ManagedLedgerOffloadBucket:                      "test-bucket",
+		S3ManagedLedgerOffloadRegion:                      "us-west-2",
+		S3ManagedLedgerOffloadServiceEndpoint:             "https://s3.amazonaws.com",
+	}
+	err = admin.Topics().SetOffloadPolicies(*topicName, newOffloadPolicies)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			offloadPolicies, err = admin.Topics().GetOffloadPolicies(*topicName)
+			return err == nil &&
+				offloadPolicies.ManagedLedgerOffloadDriver == "aws-s3" &&
+				offloadPolicies.ManagedLedgerOffloadMaxThreads == 4 &&
+				offloadPolicies.ManagedLedgerOffloadThresholdInBytes == 1073741824 &&
+				offloadPolicies.S3ManagedLedgerOffloadBucket == "test-bucket"
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove offload policies
+	err = admin.Topics().RemoveOffloadPolicies(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			offloadPolicies, err = admin.Topics().GetOffloadPolicies(*topicName)
+			return err == nil &&
+				offloadPolicies.ManagedLedgerOffloadDriver == "" &&
+				offloadPolicies.ManagedLedgerOffloadMaxThreads == 0
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestAutoSubscriptionCreation(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default auto subscription creation
+	autoSubCreation, err := admin.Topics().GetAutoSubscriptionCreation(*topicName)
+	assert.NoError(t, err)
+	assert.Equal(t, false, autoSubCreation.AllowAutoSubscriptionCreation)
+
+	// Set auto subscription creation to true
+	newAutoSubCreation := utils.AutoSubscriptionCreationOverride{
+		AllowAutoSubscriptionCreation: true,
+	}
+	err = admin.Topics().SetAutoSubscriptionCreation(*topicName, newAutoSubCreation)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			autoSubCreation, err = admin.Topics().GetAutoSubscriptionCreation(*topicName)
+			return err == nil &&
+				autoSubCreation.AllowAutoSubscriptionCreation == true
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove auto subscription creation policy
+	err = admin.Topics().RemoveAutoSubscriptionCreation(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			autoSubCreation, err = admin.Topics().GetAutoSubscriptionCreation(*topicName)
+			return err == nil &&
+				autoSubCreation.AllowAutoSubscriptionCreation == false
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
+
+func TestSchemaCompatibilityStrategy(t *testing.T) {
+	randomName := newTopicName()
+	topic := "persistent://public/default/" + randomName
+	cfg := &config.Config{}
+	admin, err := New(cfg)
+	assert.NoError(t, err)
+	assert.NotNil(t, admin)
+	topicName, err := utils.GetTopicName(topic)
+	assert.NoError(t, err)
+	err = admin.Topics().Create(*topicName, 4)
+	assert.NoError(t, err)
+
+	// Get default schema compatibility strategy (adapt to actual server behavior)
+	initialStrategy, err := admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+	assert.NoError(t, err)
+	// Server may return empty string instead of "UNDEFINED"
+
+	// Set new schema compatibility strategy
+	err = admin.Topics().SetSchemaCompatibilityStrategy(*topicName, utils.SchemaCompatibilityStrategyBackward)
+	assert.NoError(t, err)
+
+	// topic policy is an async operation,
+	// so we need to wait for a while to get current value
+	assert.Eventually(
+		t,
+		func() bool {
+			strategy, err := admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+			return err == nil &&
+				strategy == utils.SchemaCompatibilityStrategyBackward
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+
+	// Remove schema compatibility strategy policy
+	err = admin.Topics().RemoveSchemaCompatibilityStrategy(*topicName)
+	assert.NoError(t, err)
+	assert.Eventually(
+		t,
+		func() bool {
+			strategy, err := admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+			return err == nil &&
+				strategy == initialStrategy
+		},
+		10*time.Second,
+		100*time.Millisecond,
+	)
+}
diff --git a/pulsaradmin/pkg/utils/auto_subscription_creation.go b/pulsaradmin/pkg/utils/auto_subscription_creation.go
new file mode 100644
index 0000000..7f3b9da
--- /dev/null
+++ b/pulsaradmin/pkg/utils/auto_subscription_creation.go
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+type AutoSubscriptionCreationOverride struct {
+	AllowAutoSubscriptionCreation bool `json:"allowAutoSubscriptionCreation"`
+}
+
+func NewAutoSubscriptionCreationOverride() *AutoSubscriptionCreationOverride {
+	return &AutoSubscriptionCreationOverride{
+		AllowAutoSubscriptionCreation: false,
+	}
+}
diff --git a/pulsaradmin/pkg/utils/auto_subscription_creation_test.go b/pulsaradmin/pkg/utils/auto_subscription_creation_test.go
new file mode 100644
index 0000000..5a83a6d
--- /dev/null
+++ b/pulsaradmin/pkg/utils/auto_subscription_creation_test.go
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+import (
+	"encoding/json"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestNewAutoSubscriptionCreationOverride(t *testing.T) {
+	// Test that constructor returns non-nil pointer
+	override := NewAutoSubscriptionCreationOverride()
+	assert.NotNil(t, override)
+
+	// Test that default value is set correctly
+	assert.Equal(t, false, override.AllowAutoSubscriptionCreation)
+
+	// Test that it returns pointer to struct
+	assert.IsType(t, &AutoSubscriptionCreationOverride{}, override)
+}
+
+func TestAutoSubscriptionCreationOverride_JSONSerialization(t *testing.T) {
+	tests := []struct {
+		name     string
+		override AutoSubscriptionCreationOverride
+		expected string
+	}{
+		{
+			name:     "False value serialization",
+			override: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: false},
+			expected: `{"allowAutoSubscriptionCreation":false}`,
+		},
+		{
+			name:     "True value serialization",
+			override: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: true},
+			expected: `{"allowAutoSubscriptionCreation":true}`,
+		},
+		{
+			name:     "Default constructor serialization",
+			override: *NewAutoSubscriptionCreationOverride(),
+			expected: `{"allowAutoSubscriptionCreation":false}`,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			jsonData, err := json.Marshal(tt.override)
+			assert.NoError(t, err)
+			assert.Equal(t, tt.expected, string(jsonData))
+		})
+	}
+}
+
+func TestAutoSubscriptionCreationOverride_JSONDeserialization(t *testing.T) {
+	tests := []struct {
+		name     string
+		jsonData string
+		expected AutoSubscriptionCreationOverride
+		wantErr  bool
+	}{
+		{
+			name:     "False value deserialization",
+			jsonData: `{"allowAutoSubscriptionCreation":false}`,
+			expected: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: false},
+			wantErr:  false,
+		},
+		{
+			name:     "True value deserialization",
+			jsonData: `{"allowAutoSubscriptionCreation":true}`,
+			expected: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: true},
+			wantErr:  false,
+		},
+		{
+			name:     "Empty JSON object",
+			jsonData: `{}`,
+			expected: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: false},
+			wantErr:  false,
+		},
+		{
+			name:     "Extra fields ignored",
+			jsonData: `{"allowAutoSubscriptionCreation":true,"extraField":"ignored"}`,
+			expected: AutoSubscriptionCreationOverride{AllowAutoSubscriptionCreation: true},
+			wantErr:  false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var override AutoSubscriptionCreationOverride
+			err := json.Unmarshal([]byte(tt.jsonData), &override)
+
+			if tt.wantErr {
+				assert.Error(t, err)
+			} else {
+				assert.NoError(t, err)
+				assert.Equal(t, tt.expected, override)
+			}
+		})
+	}
+}
+
+func TestAutoSubscriptionCreationOverride_FieldModification(t *testing.T) {
+	// Test field modification on constructor-created instance
+	override := NewAutoSubscriptionCreationOverride()
+
+	// Initial state should be false
+	assert.Equal(t, false, override.AllowAutoSubscriptionCreation)
+
+	// Modify to true
+	override.AllowAutoSubscriptionCreation = true
+	assert.Equal(t, true, override.AllowAutoSubscriptionCreation)
+
+	// Modify back to false
+	override.AllowAutoSubscriptionCreation = false
+	assert.Equal(t, false, override.AllowAutoSubscriptionCreation)
+
+	// Test field modification on manually created instance
+	manual := &AutoSubscriptionCreationOverride{
+		AllowAutoSubscriptionCreation: true,
+	}
+	assert.Equal(t, true, manual.AllowAutoSubscriptionCreation)
+
+	manual.AllowAutoSubscriptionCreation = false
+	assert.Equal(t, false, manual.AllowAutoSubscriptionCreation)
+}
+
+func TestAutoSubscriptionCreationOverride_InvalidJSON(t *testing.T) {
+	tests := []struct {
+		name     string
+		jsonData string
+	}{
+		{
+			name:     "Invalid JSON syntax",
+			jsonData: `{"allowAutoSubscriptionCreation":false`,
+		},
+		{
+			name:     "Invalid boolean value",
+			jsonData: `{"allowAutoSubscriptionCreation":"invalid"}`,
+		},
+		{
+			name:     "Invalid JSON structure",
+			jsonData: `[{"allowAutoSubscriptionCreation":true}]`,
+		},
+		{
+			name:     "Completely invalid JSON",
+			jsonData: `invalid json`,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var override AutoSubscriptionCreationOverride
+			err := json.Unmarshal([]byte(tt.jsonData), &override)
+			assert.Error(t, err)
+		})
+	}
+}
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index 58673dc..441aa33 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -486,6 +486,27 @@
 type DelayedDeliveryData struct {
 	TickTime float64 `json:"tickTime"`
 	Active   bool    `json:"active"`
+	// MaxDelayInMillis is optional and was added for enhanced delayed delivery support
+	// Default value 0 means no maximum delay limit (backward compatible)
+	MaxDelayInMillis int64 `json:"maxDelayInMillis,omitempty"`
+}
+
+// NewDelayedDeliveryData creates a DelayedDeliveryData with backward compatible defaults
+func NewDelayedDeliveryData(tickTime float64, active bool) *DelayedDeliveryData {
+	return &DelayedDeliveryData{
+		TickTime: tickTime,
+		Active:   active,
+		// MaxDelayInMillis is left as 0 (no limit) for backward compatibility
+	}
+}
+
+// NewDelayedDeliveryDataWithMaxDelay creates a DelayedDeliveryData with max delay limit
+func NewDelayedDeliveryDataWithMaxDelay(tickTime float64, active bool, maxDelayMs int64) *DelayedDeliveryData {
+	return &DelayedDeliveryData{
+		TickTime:         tickTime,
+		Active:           active,
+		MaxDelayInMillis: maxDelayMs,
+	}
 }
 
 type DispatchRateData struct {
diff --git a/pulsaradmin/pkg/utils/offload_policies.go b/pulsaradmin/pkg/utils/offload_policies.go
new file mode 100644
index 0000000..f39ddad
--- /dev/null
+++ b/pulsaradmin/pkg/utils/offload_policies.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+//nolint:lll
+type OffloadPolicies struct {
+	ManagedLedgerOffloadDriver                        string            `json:"managedLedgerOffloadDriver,omitempty"`
+	ManagedLedgerOffloadMaxThreads                    int               `json:"managedLedgerOffloadMaxThreads,omitempty"`
+	ManagedLedgerOffloadThresholdInBytes              int64             `json:"managedLedgerOffloadThresholdInBytes,omitempty"`
+	ManagedLedgerOffloadDeletionLagInMillis           int64             `json:"managedLedgerOffloadDeletionLagInMillis,omitempty"`
+	ManagedLedgerOffloadAutoTriggerSizeThresholdBytes int64             `json:"managedLedgerOffloadAutoTriggerSizeThresholdBytes,omitempty"`
+	S3ManagedLedgerOffloadBucket                      string            `json:"s3ManagedLedgerOffloadBucket,omitempty"`
+	S3ManagedLedgerOffloadRegion                      string            `json:"s3ManagedLedgerOffloadRegion,omitempty"`
+	S3ManagedLedgerOffloadServiceEndpoint             string            `json:"s3ManagedLedgerOffloadServiceEndpoint,omitempty"`
+	S3ManagedLedgerOffloadCredentialID                string            `json:"s3ManagedLedgerOffloadCredentialId,omitempty"`
+	S3ManagedLedgerOffloadCredentialSecret            string            `json:"s3ManagedLedgerOffloadCredentialSecret,omitempty"`
+	S3ManagedLedgerOffloadRole                        string            `json:"s3ManagedLedgerOffloadRole,omitempty"`
+	S3ManagedLedgerOffloadRoleSessionName             string            `json:"s3ManagedLedgerOffloadRoleSessionName,omitempty"`
+	OffloadersDirectory                               string            `json:"offloadersDirectory,omitempty"`
+	ManagedLedgerOffloadDriverMetadata                map[string]string `json:"managedLedgerOffloadDriverMetadata,omitempty"`
+}
+
+func NewOffloadPolicies() *OffloadPolicies {
+	return &OffloadPolicies{
+		ManagedLedgerOffloadMaxThreads:                    2,
+		ManagedLedgerOffloadThresholdInBytes:              -1,
+		ManagedLedgerOffloadDeletionLagInMillis:           14400000, // 4 hours
+		ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: -1,
+		ManagedLedgerOffloadDriverMetadata:                make(map[string]string),
+	}
+}
diff --git a/pulsaradmin/pkg/utils/offload_policies_test.go b/pulsaradmin/pkg/utils/offload_policies_test.go
new file mode 100644
index 0000000..c142d7b
--- /dev/null
+++ b/pulsaradmin/pkg/utils/offload_policies_test.go
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+import (
+	"encoding/json"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestNewOffloadPolicies(t *testing.T) {
+	// Test that constructor returns non-nil pointer
+	policies := NewOffloadPolicies()
+	assert.NotNil(t, policies)
+
+	// Test that default values are set correctly
+	assert.Equal(t, 2, policies.ManagedLedgerOffloadMaxThreads)
+	assert.Equal(t, int64(-1), policies.ManagedLedgerOffloadThresholdInBytes)
+	assert.Equal(t, int64(14400000), policies.ManagedLedgerOffloadDeletionLagInMillis) // 4 hours
+	assert.Equal(t, int64(-1), policies.ManagedLedgerOffloadAutoTriggerSizeThresholdBytes)
+
+	// Test that map is initialized and empty
+	assert.NotNil(t, policies.ManagedLedgerOffloadDriverMetadata)
+	assert.Equal(t, 0, len(policies.ManagedLedgerOffloadDriverMetadata))
+
+	// Test that other fields are zero values
+	assert.Equal(t, "", policies.ManagedLedgerOffloadDriver)
+	assert.Equal(t, "", policies.S3ManagedLedgerOffloadBucket)
+	assert.Equal(t, "", policies.S3ManagedLedgerOffloadRegion)
+	assert.Equal(t, "", policies.S3ManagedLedgerOffloadServiceEndpoint)
+	assert.Equal(t, "", policies.S3ManagedLedgerOffloadCredentialID)
+	assert.Equal(t, "", policies.S3ManagedLedgerOffloadCredentialSecret)
+	assert.Equal(t, "", policies.S3ManagedLedgerOffloadRole)
+	assert.Equal(t, "", policies.S3ManagedLedgerOffloadRoleSessionName)
+	assert.Equal(t, "", policies.OffloadersDirectory)
+
+	// Test that it returns pointer to struct
+	assert.IsType(t, &OffloadPolicies{}, policies)
+}
+
+func TestOffloadPolicies_JSONSerialization(t *testing.T) {
+	tests := []struct {
+		name     string
+		policies OffloadPolicies
+		expected string
+	}{
+		//nolint:lll
+		{
+			name:     "Default constructor serialization (omitempty behavior)",
+			policies: *NewOffloadPolicies(),
+			expected: `{"managedLedgerOffloadMaxThreads":2,"managedLedgerOffloadThresholdInBytes":-1,"managedLedgerOffloadDeletionLagInMillis":14400000,"managedLedgerOffloadAutoTriggerSizeThresholdBytes":-1}`,
+		},
+		//nolint:lll
+		{
+			name: "Full object serialization",
+			policies: OffloadPolicies{
+				ManagedLedgerOffloadDriver:                        "s3",
+				ManagedLedgerOffloadMaxThreads:                    4,
+				ManagedLedgerOffloadThresholdInBytes:              1000000,
+				ManagedLedgerOffloadDeletionLagInMillis:           7200000,
+				ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: 500000,
+				S3ManagedLedgerOffloadBucket:                      "test-bucket",
+				S3ManagedLedgerOffloadRegion:                      "us-west-2",
+				S3ManagedLedgerOffloadServiceEndpoint:             "https://s3.us-west-2.amazonaws.com",
+				S3ManagedLedgerOffloadCredentialID:                "access-key",
+				S3ManagedLedgerOffloadCredentialSecret:            "secret-key",
+				S3ManagedLedgerOffloadRole:                        "test-role",
+				S3ManagedLedgerOffloadRoleSessionName:             "test-session",
+				OffloadersDirectory:                               "/opt/offloaders",
+				ManagedLedgerOffloadDriverMetadata:                map[string]string{"key1": "value1", "key2": "value2"},
+			},
+			expected: `{"managedLedgerOffloadDriver":"s3","managedLedgerOffloadMaxThreads":4,"managedLedgerOffloadThresholdInBytes":1000000,"managedLedgerOffloadDeletionLagInMillis":7200000,"managedLedgerOffloadAutoTriggerSizeThresholdBytes":500000,"s3ManagedLedgerOffloadBucket":"test-bucket","s3ManagedLedgerOffloadRegion":"us-west-2","s3ManagedLedgerOffloadServiceEndpoint":"https://s3.us-west-2.amazonaws.com","s3ManagedLedgerOffloadCredentialId":"access-key","s3ManagedLedgerOffloadCredentialSecret":"secret-key","s3ManagedLedgerOffloadRole":"test-role","s3ManagedLedgerOffloadRoleSessionName":"test-session","offloadersDirectory":"/opt/offloaders","managedLedgerOffloadDriverMetadata":{"key1":"value1","key2":"value2"}}`,
+		},
+		{
+			name:     "Empty struct serialization (zero values omitted)",
+			policies: OffloadPolicies{},
+			expected: `{}`,
+		},
+		{
+			name: "Non-empty map serialization",
+			policies: OffloadPolicies{
+				ManagedLedgerOffloadDriverMetadata: map[string]string{"key": "value"},
+			},
+			expected: `{"managedLedgerOffloadDriverMetadata":{"key":"value"}}`,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			jsonData, err := json.Marshal(tt.policies)
+			assert.NoError(t, err)
+			assert.Equal(t, tt.expected, string(jsonData))
+		})
+	}
+}
+
+func TestOffloadPolicies_JSONDeserialization(t *testing.T) {
+	tests := []struct {
+		name     string
+		jsonData string
+		expected OffloadPolicies
+		wantErr  bool
+	}{
+		//nolint:lll
+		{
+			name:     "Complete JSON deserialization",
+			jsonData: `{"managedLedgerOffloadDriver":"s3","managedLedgerOffloadMaxThreads":4,"managedLedgerOffloadThresholdInBytes":1000000,"managedLedgerOffloadDeletionLagInMillis":7200000,"managedLedgerOffloadAutoTriggerSizeThresholdBytes":500000,"s3ManagedLedgerOffloadBucket":"test-bucket","s3ManagedLedgerOffloadRegion":"us-west-2","s3ManagedLedgerOffloadServiceEndpoint":"https://s3.us-west-2.amazonaws.com","s3ManagedLedgerOffloadCredentialId":"access-key","s3ManagedLedgerOffloadCredentialSecret":"secret-key","s3ManagedLedgerOffloadRole":"test-role","s3ManagedLedgerOffloadRoleSessionName":"test-session","offloadersDirectory":"/opt/offloaders","managedLedgerOffloadDriverMetadata":{"key1":"value1","key2":"value2"}}`,
+			expected: OffloadPolicies{
+				ManagedLedgerOffloadDriver:                        "s3",
+				ManagedLedgerOffloadMaxThreads:                    4,
+				ManagedLedgerOffloadThresholdInBytes:              1000000,
+				ManagedLedgerOffloadDeletionLagInMillis:           7200000,
+				ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: 500000,
+				S3ManagedLedgerOffloadBucket:                      "test-bucket",
+				S3ManagedLedgerOffloadRegion:                      "us-west-2",
+				S3ManagedLedgerOffloadServiceEndpoint:             "https://s3.us-west-2.amazonaws.com",
+				S3ManagedLedgerOffloadCredentialID:                "access-key",
+				S3ManagedLedgerOffloadCredentialSecret:            "secret-key",
+				S3ManagedLedgerOffloadRole:                        "test-role",
+				S3ManagedLedgerOffloadRoleSessionName:             "test-session",
+				OffloadersDirectory:                               "/opt/offloaders",
+				ManagedLedgerOffloadDriverMetadata:                map[string]string{"key1": "value1", "key2": "value2"},
+			},
+			wantErr: false,
+		},
+		//nolint:lll
+		{
+			name:     "Partial JSON deserialization",
+			jsonData: `{"managedLedgerOffloadDriver":"s3","managedLedgerOffloadMaxThreads":4,"s3ManagedLedgerOffloadBucket":"test-bucket"}`,
+			expected: OffloadPolicies{
+				ManagedLedgerOffloadDriver:     "s3",
+				ManagedLedgerOffloadMaxThreads: 4,
+				S3ManagedLedgerOffloadBucket:   "test-bucket",
+				// Other fields should be zero values
+			},
+			wantErr: false,
+		},
+		{
+			name:     "Empty JSON object",
+			jsonData: `{}`,
+			expected: OffloadPolicies{
+				// All fields should be zero values
+			},
+			wantErr: false,
+		},
+		{
+			name:     "JSON with metadata map",
+			jsonData: `{"managedLedgerOffloadDriverMetadata":{"env":"prod","version":"1.0"}}`,
+			expected: OffloadPolicies{
+				ManagedLedgerOffloadDriverMetadata: map[string]string{"env": "prod", "version": "1.0"},
+			},
+			wantErr: false,
+		},
+		{
+			name:     "JSON with extra fields ignored",
+			jsonData: `{"managedLedgerOffloadDriver":"s3","extraField":"ignored","managedLedgerOffloadMaxThreads":4}`,
+			expected: OffloadPolicies{
+				ManagedLedgerOffloadDriver:     "s3",
+				ManagedLedgerOffloadMaxThreads: 4,
+			},
+			wantErr: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var policies OffloadPolicies
+			err := json.Unmarshal([]byte(tt.jsonData), &policies)
+
+			if tt.wantErr {
+				assert.Error(t, err)
+			} else {
+				assert.NoError(t, err)
+				assert.Equal(t, tt.expected, policies)
+			}
+		})
+	}
+}
+
+func TestOffloadPolicies_FieldModification(t *testing.T) {
+	// Test field modification on constructor-created instance
+	policies := NewOffloadPolicies()
+
+	// Test modifying string fields
+	policies.ManagedLedgerOffloadDriver = "s3"
+	assert.Equal(t, "s3", policies.ManagedLedgerOffloadDriver)
+
+	policies.S3ManagedLedgerOffloadBucket = "test-bucket"
+	assert.Equal(t, "test-bucket", policies.S3ManagedLedgerOffloadBucket)
+
+	// Test modifying int field
+	policies.ManagedLedgerOffloadMaxThreads = 8
+	assert.Equal(t, 8, policies.ManagedLedgerOffloadMaxThreads)
+
+	// Test modifying int64 fields
+	policies.ManagedLedgerOffloadThresholdInBytes = 5000000
+	assert.Equal(t, int64(5000000), policies.ManagedLedgerOffloadThresholdInBytes)
+
+	policies.ManagedLedgerOffloadDeletionLagInMillis = 3600000 // 1 hour
+	assert.Equal(t, int64(3600000), policies.ManagedLedgerOffloadDeletionLagInMillis)
+
+	// Test field modification on manually created instance
+	manual := &OffloadPolicies{
+		ManagedLedgerOffloadDriver:     "gcs",
+		ManagedLedgerOffloadMaxThreads: 4,
+		S3ManagedLedgerOffloadBucket:   "initial-bucket",
+	}
+
+	assert.Equal(t, "gcs", manual.ManagedLedgerOffloadDriver)
+	assert.Equal(t, 4, manual.ManagedLedgerOffloadMaxThreads)
+	assert.Equal(t, "initial-bucket", manual.S3ManagedLedgerOffloadBucket)
+
+	// Modify the manually created instance
+	manual.ManagedLedgerOffloadDriver = "azure"
+	manual.ManagedLedgerOffloadMaxThreads = 6
+
+	assert.Equal(t, "azure", manual.ManagedLedgerOffloadDriver)
+	assert.Equal(t, 6, manual.ManagedLedgerOffloadMaxThreads)
+	assert.Equal(t, "initial-bucket", manual.S3ManagedLedgerOffloadBucket)
+}
+
+func TestOffloadPolicies_MapOperations(t *testing.T) {
+	policies := NewOffloadPolicies()
+
+	// Test initial state of map
+	assert.NotNil(t, policies.ManagedLedgerOffloadDriverMetadata)
+	assert.Equal(t, 0, len(policies.ManagedLedgerOffloadDriverMetadata))
+
+	// Test adding entries to map
+	policies.ManagedLedgerOffloadDriverMetadata["key1"] = "value1"
+	policies.ManagedLedgerOffloadDriverMetadata["key2"] = "value2"
+
+	assert.Equal(t, 2, len(policies.ManagedLedgerOffloadDriverMetadata))
+	assert.Equal(t, "value1", policies.ManagedLedgerOffloadDriverMetadata["key1"])
+	assert.Equal(t, "value2", policies.ManagedLedgerOffloadDriverMetadata["key2"])
+
+	// Test modifying existing entry
+	policies.ManagedLedgerOffloadDriverMetadata["key1"] = "modified_value1"
+	assert.Equal(t, "modified_value1", policies.ManagedLedgerOffloadDriverMetadata["key1"])
+
+	// Test deleting entry
+	delete(policies.ManagedLedgerOffloadDriverMetadata, "key2")
+	assert.Equal(t, 1, len(policies.ManagedLedgerOffloadDriverMetadata))
+	_, exists := policies.ManagedLedgerOffloadDriverMetadata["key2"]
+	assert.False(t, exists)
+
+	// Test with nil map (should not panic but won't work as expected)
+	nilMapPolicies := &OffloadPolicies{}
+	assert.Nil(t, nilMapPolicies.ManagedLedgerOffloadDriverMetadata)
+
+	// Initialize the map manually
+	nilMapPolicies.ManagedLedgerOffloadDriverMetadata = make(map[string]string)
+	nilMapPolicies.ManagedLedgerOffloadDriverMetadata["test"] = "value"
+	assert.Equal(t, "value", nilMapPolicies.ManagedLedgerOffloadDriverMetadata["test"])
+}
+
+func TestOffloadPolicies_InvalidJSON(t *testing.T) {
+	tests := []struct {
+		name     string
+		jsonData string
+	}{
+		{
+			name:     "Invalid JSON syntax",
+			jsonData: `{"managedLedgerOffloadDriver":"s3"`,
+		},
+		{
+			name:     "Invalid int value",
+			jsonData: `{"managedLedgerOffloadMaxThreads":"invalid"}`,
+		},
+		{
+			name:     "Invalid int64 value",
+			jsonData: `{"managedLedgerOffloadThresholdInBytes":"invalid"}`,
+		},
+		{
+			name:     "Invalid JSON structure",
+			jsonData: `[{"managedLedgerOffloadDriver":"s3"}]`,
+		},
+		{
+			name:     "Invalid map structure",
+			jsonData: `{"managedLedgerOffloadDriverMetadata":"invalid"}`,
+		},
+		{
+			name:     "Completely invalid JSON",
+			jsonData: `invalid json`,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var policies OffloadPolicies
+			err := json.Unmarshal([]byte(tt.jsonData), &policies)
+			assert.Error(t, err)
+		})
+	}
+}