[improve][broker] Consistently add fine-grain authorization to REST API (#22202)

(cherry picked from commit 68c10925df43769eee7265b4af0ac8ee4913e715)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1fc46f9..94fb1f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -75,6 +75,7 @@
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
@@ -356,7 +357,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -379,7 +381,8 @@
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -401,7 +404,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -425,7 +429,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
             .thenApply(asyncResponse::resume).exceptionally(ex -> {
                 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse);
@@ -449,7 +454,8 @@
             @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
                     Integer maxUnackedNum) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -471,7 +477,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -494,7 +501,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
             .thenAccept(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
@@ -522,7 +530,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -544,7 +553,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -568,7 +578,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetInactiveTopicPolicies(applied, isGlobal))
             .thenApply(asyncResponse::resume).exceptionally(ex -> {
                 handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
@@ -591,7 +602,8 @@
             @ApiParam(value = "inactive topic policies for the specified topic")
             InactiveTopicPolicies inactiveTopicPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -613,7 +625,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetInactiveTopicPolicies(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -637,7 +650,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -712,7 +726,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -1909,16 +1924,17 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalExamineMessageAsync(initialPosition, messagePosition, authoritative)
-                .thenAccept(asyncResponse::resume)
-                .exceptionally(ex -> {
-                    if (isNot307And404Exception(ex)) {
-                        log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName,
-                                ex);
-                    }
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
+        validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)
+            .thenCompose(__ -> internalExamineMessageAsync(initialPosition, messagePosition, authoritative))
+            .thenAccept(asyncResponse::resume)
+            .exceptionally(ex -> {
+                if (isNot307And404Exception(ex)) {
+                    log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName,
+                            ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
@@ -2025,7 +2041,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalGetBacklogAsync(authoritative)
+        validateTopicOperationAsync(topicName, TopicOperation.GET_BACKLOG_SIZE)
+                .thenCompose(__ ->  internalGetBacklogAsync(authoritative))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     Throwable t = FutureUtil.unwrapCompletionException(ex);
@@ -2081,7 +2098,8 @@
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetBacklogQuota(applied, isGlobal))
             .thenAccept(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2164,7 +2182,8 @@
                                       + "For internal use.")
                               @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
                 .thenAccept(op -> {
                     asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(() -> {
@@ -2246,7 +2265,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
             .thenAccept(op -> asyncResponse.resume(op
                 .map(TopicPolicies::getMessageTTLInSeconds)
@@ -2283,7 +2303,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMessageTTL(messageTTL, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2310,7 +2331,8 @@
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMessageTTL(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2336,7 +2358,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetDeduplication(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2363,7 +2386,8 @@
             @ApiParam(value = "DeduplicationEnabled policies for the specified topic")
                     Boolean enabled) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetDeduplication(enabled, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2388,7 +2412,8 @@
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetDeduplication(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2494,7 +2519,6 @@
                 return null;
             });
     }
-
     @GET
     @Path("/{tenant}/{namespace}/{topic}/persistence")
     @ApiOperation(
@@ -2515,7 +2539,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetPersistence(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2543,7 +2568,8 @@
             @ApiParam(value = "Bookkeeper persistence policies for specified topic")
                                            PersistencePolicies persistencePolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetPersistence(persistencePolicies, isGlobal))
             .thenRun(() -> {
                 try {
@@ -2579,7 +2605,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemovePersistence(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}",
@@ -2610,7 +2637,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic(isGlobal))
             .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
                     : Response.noContent().build()))
@@ -2638,7 +2666,8 @@
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}"
@@ -2668,7 +2697,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}",
@@ -2698,7 +2728,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetReplicatorDispatchRate(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2725,7 +2756,8 @@
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
@@ -2755,7 +2787,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetReplicatorDispatchRate(null, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
@@ -2785,7 +2818,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxProducers(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2812,7 +2846,8 @@
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "The max producers of the topic") int maxProducers) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxProducers(maxProducers, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}",
@@ -2844,7 +2879,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveMaxProducers(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove max producers: namespace={}, topic={}",
@@ -2876,7 +2912,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxConsumers(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2903,7 +2940,8 @@
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "The max consumers of the topic") int maxConsumers) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxConsumers(maxConsumers, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}",
@@ -2935,7 +2973,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveMaxConsumers(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove max consumers: namespace={}, topic={}",
@@ -2966,7 +3005,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxMessageSize(isGlobal))
             .thenAccept(policies -> {
                 asyncResponse.resume(policies.isPresent() ? policies.get() : Response.noContent().build());
@@ -2995,7 +3035,8 @@
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "The max message size of the topic") int maxMessageSize) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxMessageSize(maxMessageSize, isGlobal))
             .thenRun(() -> {
                 log.info(
@@ -3029,7 +3070,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxMessageSize(null, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove max message size: namespace={}, topic={}",
@@ -3337,7 +3379,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetDispatchRate(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -3363,7 +3406,8 @@
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Dispatch rate for the specified topic") DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetDispatchRate(dispatchRate, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3399,7 +3443,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveDispatchRate(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}",
@@ -3435,7 +3480,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetSubscriptionDispatchRate(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -3463,7 +3509,8 @@
             @ApiParam(value = "Subscription message dispatch rate for the specified topic")
                     DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3499,7 +3546,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveSubscriptionDispatchRate(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
@@ -3533,7 +3581,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetSubscriptionLevelDispatchRate(
                     Codec.decode(encodedSubscriptionName), applied, isGlobal))
             .thenApply(asyncResponse::resume)
@@ -3563,7 +3612,8 @@
             @ApiParam(value = "Subscription message dispatch rate for the specified topic")
                     DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetSubscriptionLevelDispatchRate(
                     Codec.decode(encodedSubscriptionName), dispatchRate, isGlobal))
             .thenRun(() -> {
@@ -3601,7 +3651,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveSubscriptionLevelDispatchRate(
                     Codec.decode(encodedSubscriptionName), isGlobal))
             .thenRun(() -> {
@@ -3633,7 +3684,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetCompactionThreshold(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -3659,7 +3711,8 @@
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3695,7 +3748,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveCompactionThreshold(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
@@ -3730,7 +3784,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxConsumersPerSubscription(isGlobal))
             .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
                     : Response.noContent().build()))
@@ -3758,7 +3813,8 @@
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3794,7 +3850,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveMaxConsumersPerSubscription(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic max consumers per subscription:"
@@ -3827,7 +3884,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetPublishRate(isGlobal))
             .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
                     : Response.noContent().build()))
@@ -3854,7 +3912,8 @@
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetPublishRate(publishRate, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3891,7 +3950,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemovePublishRate(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}",
@@ -3928,7 +3988,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetSubscriptionTypesEnabled(isGlobal))
             .thenAccept(op -> {
                 asyncResponse.resume(op.isPresent() ? op.get()
@@ -3958,7 +4019,8 @@
             @ApiParam(value = "Enable sub types for the specified topic")
             Set<SubscriptionType> subscriptionTypesEnabled) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3994,7 +4056,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalRemoveSubscriptionTypesEnabled(isGlobal))
                 .thenRun(() -> {
                     log.info("[{}] Successfully remove subscription types enabled: namespace={}, topic={}",
@@ -4026,7 +4089,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalGetSubscribeRate(applied, isGlobal))
                 .thenApply(asyncResponse::resume).exceptionally(ex -> {
             handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
@@ -4052,7 +4116,8 @@
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetSubscribeRate(subscribeRate, isGlobal))
             .thenRun(() -> {
                 try {
@@ -4090,7 +4155,8 @@
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveSubscribeRate(isGlobal))
             .thenRun(() -> {
                 log.info(
@@ -4338,7 +4404,8 @@
                                                     + "broker. For internal use.")
                                             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalGetSchemaValidationEnforced(applied))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
@@ -4365,7 +4432,8 @@
                                             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
                                             @ApiParam(required = true) boolean schemaValidationEnforced) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalSetSchemaValidationEnforced(schemaValidationEnforced))
                 .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
@@ -4561,7 +4629,8 @@
             @ApiParam(value = "Settings for automatic subscription creation")
             AutoSubscriptionCreationOverrideImpl autoSubscriptionCreationOverride) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalSetAutoSubscriptionCreation(autoSubscriptionCreationOverride, isGlobal))
                 .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
@@ -4587,7 +4656,8 @@
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalGetAutoSubscriptionCreation(applied, isGlobal))
                 .thenApply(asyncResponse::resume).exceptionally(ex -> {
                     handleTopicPolicyException("getAutoSubscriptionCreation", ex, asyncResponse);
@@ -4612,7 +4682,8 @@
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalSetAutoSubscriptionCreation(null, isGlobal))
                 .thenRun(() -> {
                     log.info(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
index f07b9a6..bcb8e32 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.awaitility.Awaitility.await;
 import io.jsonwebtoken.Jwts;
 import java.util.Set;
 import java.util.UUID;
@@ -27,6 +28,8 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.security.MockedPulsarStandalone;
@@ -35,8 +38,6 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.awaitility.Awaitility.await;
-
 
 public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {
 
@@ -172,4 +173,311 @@
         }
     }
 
+    @SneakyThrows
+    @Test
+    public void testOffloadPolicy() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+
+        // mocked data
+        final OffloadPoliciesImpl definedOffloadPolicies = new OffloadPoliciesImpl();
+        definedOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(100L);
+        definedOffloadPolicies.setManagedLedgerOffloadThresholdInSeconds(100L);
+        definedOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(200L);
+        definedOffloadPolicies.setManagedLedgerOffloadDriver("s3");
+        definedOffloadPolicies.setManagedLedgerOffloadBucket("buck");
+
+        // test superuser
+        superUserAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
+
+        // because the topic policies is eventual consistency, we should wait here
+        await().untilAsserted(() -> {
+            final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
+        });
+        superUserAdmin.topicPolicies().removeOffloadPolicies(topic);
+
+        await().untilAsserted(() -> {
+            final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.assertNull(offloadPolicy);
+        });
+
+        // test tenant manager
+
+        tenantManagerAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
+        await().untilAsserted(() -> {
+            final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
+        });
+        tenantManagerAdmin.topicPolicies().removeOffloadPolicies(topic);
+        await().untilAsserted(() -> {
+            final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.assertNull(offloadPolicy);
+        });
+
+        // test nobody
+
+        try {
+            subAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+
+            subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            subAdmin.topicPolicies().removeOffloadPolicies(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        // test sub user with permissions
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                    subject, Set.of(action));
+            try {
+                subAdmin.topicPolicies().getOffloadPolicies(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+
+                subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                subAdmin.topicPolicies().removeOffloadPolicies(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+            superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+        }
+    }
+
+    @SneakyThrows
+    @Test
+    public void testMaxUnackedMessagesOnConsumer() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+
+        // mocked data
+        int definedUnackedMessagesOnConsumer = 100;
+
+        // test superuser
+        superUserAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+
+        // because the topic policies is eventual consistency, we should wait here
+        await().untilAsserted(() -> {
+            final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
+                    .getMaxUnackedMessagesOnConsumer(topic);
+            Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+        });
+        superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+
+        await().untilAsserted(() -> {
+            final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+            Assert.assertNull(unackedMessagesOnConsumer);
+        });
+
+        // test tenant manager
+
+        tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+        await().untilAsserted(() -> {
+            final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+            Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+        });
+        tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+        await().untilAsserted(() -> {
+            final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+            Assert.assertNull(unackedMessagesOnConsumer);
+        });
+
+        // test nobody
+
+        try {
+            subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+
+            subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        // test sub user with permissions
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                    subject, Set.of(action));
+            try {
+                subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+
+                subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+            superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+        }
+    }
+
+    @SneakyThrows
+    @Test
+    public void testMaxUnackedMessagesOnSubscription() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+
+        // mocked data
+        int definedUnackedMessagesOnConsumer = 100;
+
+        // test superuser
+        superUserAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+
+        // because the topic policies is eventual consistency, we should wait here
+        await().untilAsserted(() -> {
+            final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
+                    .getMaxUnackedMessagesOnSubscription(topic);
+            Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+        });
+        superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+
+        await().untilAsserted(() -> {
+            final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
+                    .getMaxUnackedMessagesOnSubscription(topic);
+            Assert.assertNull(unackedMessagesOnConsumer);
+        });
+
+        // test tenant manager
+
+        tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+        await().untilAsserted(() -> {
+            final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+            Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+        });
+        tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+        await().untilAsserted(() -> {
+            final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies()
+                    .getMaxUnackedMessagesOnSubscription(topic);
+            Assert.assertNull(unackedMessagesOnConsumer);
+        });
+
+        // test nobody
+
+        try {
+            subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+
+            subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        // test sub user with permissions
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                    subject, Set.of(action));
+            try {
+                subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+
+                subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+            superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+        }
+
+    }
 }