Add authoritative flag for topic policy to avoid redirect loop (#11051)

* Add authoritative flag for topic policy to avoid redirect loop

1. Add authoritative flag for topic policy to avoid redirect loop
2. Prevent set topic policy on a non-existing topic
3. Prevent set topic policy on a partition of a partitioned topic
4. Redirect to the broker which is owner of the partition-0 for a partitioned topic when setting topic policy
5. Don't remove policy cache when the topic removed from the broker,
   this will lead to the topic come back, but can't find the topic policy,
   since the namespace does not removed from the broker, we will not read from
   the system topic again. For this case we already handled when the broker does not
   provide service for that namespace, the topic policy cache under the namespace will be removed.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ff54668..29f3efc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -499,31 +499,17 @@
         });
     }
 
-    protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse,
-                                                      DelayedDeliveryPolicies deliveryPolicies) {
-        TopicPolicies topicPolicies = null;
+    protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies) {
+        TopicPolicies topicPolicies;
         try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies have not been initialized yet.", topicName);
-            asyncResponse.resume(new RestException(e));
-            return;
+            topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+            topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
+            topicPolicies.setDelayedDeliveryTickTimeMillis(
+                    deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
         }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
-        topicPolicies.setDelayedDeliveryTickTimeMillis(
-                deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
-        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
-                .whenComplete((result, ex) -> {
-                    if (ex != null) {
-                        log.error("Failed set delayed delivery policy for topic", ex);
-                        asyncResponse.resume(new RestException(ex));
-                    } else {
-                        asyncResponse.resume(Response.noContent().build());
-                    }
-                });
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
     }
 
     private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
@@ -775,15 +761,21 @@
     }
 
     protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(boolean applied) {
-        OffloadPoliciesImpl offloadPolicies =
-                getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null);
-        if (applied) {
-            OffloadPoliciesImpl namespacePolicy =
-                    (OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies;
-            offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies
-                    , namespacePolicy, pulsar().getConfiguration().getProperties());
+        CompletableFuture<OffloadPoliciesImpl> res = new CompletableFuture<>();
+        try {
+            OffloadPoliciesImpl offloadPolicies =
+                    getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null);
+            if (applied) {
+                OffloadPoliciesImpl namespacePolicy =
+                        (OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies;
+                offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies
+                        , namespacePolicy, pulsar().getConfiguration().getProperties());
+            }
+            res.complete(offloadPolicies);
+        } catch (Exception e) {
+            res.completeExceptionally(e);
         }
-        return CompletableFuture.completedFuture(offloadPolicies);
+        return res;
     }
 
     protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
@@ -2555,24 +2547,18 @@
         return quotaMap;
     }
 
-    protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
-                                           BacklogQuota.BacklogQuotaType backlogQuotaType,
+    protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
                                            BacklogQuotaImpl backlogQuota) {
         validateTopicPolicyOperation(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
+        TopicPolicies topicPolicies;
         if (backlogQuotaType == null) {
             backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
         }
-        TopicPolicies topicPolicies;
         try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies have not been initialized yet.", topicName);
-            asyncResponse.resume(new RestException(e));
-            return;
-        }
-        if (topicPolicies == null){
-            topicPolicies = new TopicPolicies();
+            topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
         }
 
         RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
@@ -2580,10 +2566,9 @@
             log.warn(
                     "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
                     clientAppId(), topicName);
-            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
                     "Backlog Quota exceeds configured retention quota for topic. "
                             + "Please increase retention quota and retry"));
-            return;
         }
 
         if (backlogQuota != null) {
@@ -2592,22 +2577,15 @@
             topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
         }
         Map<String, BacklogQuotaImpl> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
-        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
-                .whenComplete((r, ex) -> {
-                    if (ex != null) {
-                        log.error("Failed updated backlog quota map", ex);
-                        asyncResponse.resume(new RestException(ex));
-                    } else {
-                        try {
-                            log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
-                                    clientAppId(),
-                                    namespaceName,
-                                    topicName.getLocalName(),
-                                    jsonMapper().writeValueAsString(backLogQuotaMap));
-                        } catch (JsonProcessingException ignore) { }
-                        asyncResponse.resume(Response.noContent().build());
-                    }
-                });
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
+            try {
+                log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
+                        clientAppId(),
+                        namespaceName,
+                        topicName.getLocalName(),
+                        jsonMapper().writeValueAsString(backLogQuotaMap));
+            } catch (JsonProcessingException ignore) { }
+        });
     }
 
     protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied) {
@@ -2638,38 +2616,26 @@
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
     }
 
-    protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) {
+    protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond) {
         //Validate message ttl value.
         if (ttlInSecond != null && ttlInSecond < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                    "Invalid value for message TTL"));
         }
         TopicPolicies topicPolicies;
-        //Update existing topic policy or create a new one if not exist.
         try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies have not been initialized yet.", topicName);
-            asyncResponse.resume(new RestException(e));
-            return;
-        }
-        if (topicPolicies == null){
-            topicPolicies = new TopicPolicies();
+            topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
         }
         topicPolicies.setMessageTTLInSeconds(ttlInSecond);
-        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
-                .whenComplete((result, ex) -> {
-                    if (ex != null) {
-                        log.error("Failed set message ttl for topic", ex);
-                        asyncResponse.resume(new RestException(ex));
-                    } else {
-                        log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
-                                clientAppId(),
-                                namespaceName,
-                                topicName.getLocalName(),
-                                ttlInSecond);
-                        asyncResponse.resume(Response.noContent().build());
-                    }
-                });
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
+            log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
+                    clientAppId(),
+                    namespaceName,
+                    topicName.getLocalName(),
+                    ttlInSecond);
+        });
     }
 
     private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) {
@@ -2686,13 +2652,8 @@
         return retentionPolicies;
     }
 
-    protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
-            BacklogQuota.BacklogQuotaType backlogQuotaType) {
-        internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
-    }
-
-    protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied){
-        RetentionPolicies retentionPolicies = getTopicPolicies(topicName)
+    protected RetentionPolicies internalGetRetention(boolean applied) {
+        return getTopicPolicies(topicName)
                 .map(TopicPolicies::getRetentionPolicies).orElseGet(() -> {
                     if (applied) {
                         RetentionPolicies policies = getNamespacePolicies(namespaceName).retention_policies;
@@ -2702,15 +2663,19 @@
                     }
                     return null;
                 });
-        asyncResponse.resume(retentionPolicies == null ? Response.noContent().build() : retentionPolicies);
     }
 
     protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
         if (retention == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-                .orElseGet(TopicPolicies::new);
+        TopicPolicies topicPolicies;
+        try {
+            topicPolicies = getTopicPolicies(topicName)
+                    .orElseGet(TopicPolicies::new);
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
+        }
         BacklogQuota backlogQuota =
                     topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
         if (backlogQuota == null) {
@@ -2721,9 +2686,9 @@
             log.warn(
                     "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
                     clientAppId(), topicName);
-            throw new RestException(Status.PRECONDITION_FAILED,
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
                     "Retention Quota must exceed configured backlog quota for topic. "
-                            + "Please increase retention quota and retry");
+                            + "Please increase retention quota and retry"));
         }
         topicPolicies.setRetentionPolicies(retention);
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
@@ -2849,12 +2814,30 @@
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
     }
 
-    protected void preValidation() {
+    protected CompletableFuture<Void> preValidation(boolean authoritative) {
         checkTopicLevelPolicyEnable();
+        if (topicName.isPartitioned()) {
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                    "Not allowed to set/get topic policy for a partition"));
+        }
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        validateTopicOwnership(topicName, false);
+        return checkTopicExistsAsync(topicName).thenCompose(exist -> {
+            if (!exist) {
+                throw new RestException(Status.NOT_FOUND, "Topic not found");
+            } else {
+                return getPartitionedTopicMetadataAsync(topicName, false, false)
+                    .thenCompose(metadata -> {
+                        if (metadata.partitions > 0) {
+                            return validateTopicOwnershipAsync(TopicName.get(topicName.toString()
+                                    + TopicName.PARTITIONED_TOPIC_SUFFIX + 0), authoritative);
+                        } else {
+                            return validateTopicOwnershipAsync(topicName, authoritative);
+                        }
+                    });
+            }
+        });
     }
 
     protected CompletableFuture<Void> internalRemoveMaxProducers() {
@@ -3970,6 +3953,16 @@
         }
     }
 
+    protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
+        Throwable cause = thr.getCause();
+        if (!(cause instanceof WebApplicationException)
+                || !(((WebApplicationException) cause).getResponse().getStatus() == 307)) {
+            log.error("[{}] Failed to perform {} on topic {}",
+                    clientAppId(), methodName, topicName, cause);
+        }
+        resumeAsyncResponseExceptionally(asyncResponse, cause);
+    }
+
     protected void internalTruncateNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
         Topic topic;
         try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 56e5ce1..018749a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -25,7 +25,6 @@
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -56,7 +55,6 @@
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
@@ -279,23 +277,20 @@
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
             @ApiResponse(code = 500, message = "Internal server error"), })
     public void getOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
-                                                    @PathParam("tenant") String tenant,
-                                                    @PathParam("namespace") String namespace,
-                                                    @PathParam("topic") @Encoded String encodedTopic,
-                                                    @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetOffloadPolicies(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get offloadPolicies", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get offloadPolicies", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetOffloadPolicies(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getOffloadPolicies", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -304,15 +299,20 @@
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), })
     public void setOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
-                                                    @PathParam("tenant") String tenant,
-                                                    @PathParam("namespace") String namespace,
-                                                    @PathParam("topic") @Encoded String encodedTopic,
-                                                    @ApiParam(value = "Offload policies for the specified topic")
-                                           OffloadPoliciesImpl offloadPolicies) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(tenant);
-        internalSetOffloadPolicies(offloadPolicies).whenComplete((res, ex)
-                -> internalHandleResult(asyncResponse, res, ex, "Failed set offloadPolicies"));
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setOffloadPolicies", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -321,12 +321,19 @@
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), })
     public void removeOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
-                                      @PathParam("tenant") String tenant,
-                                      @PathParam("namespace") String namespace,
-                                      @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        setOffloadPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetOffloadPolicies(null))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeOffloadPolicies", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -336,23 +343,19 @@
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
             @ApiResponse(code = 500, message = "Internal server error"), })
     public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
-                                                    @PathParam("tenant") String tenant,
-                                                    @PathParam("namespace") String namespace,
-                                                    @PathParam("topic") @Encoded String encodedTopic,
-                                                    @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetMaxUnackedMessagesOnConsumer(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get maxUnackedMessagesOnConsumer", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get maxUnackedMessagesOnConsumer", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied))
+            .thenApply(asyncResponse::resume).exceptionally(ex -> {
+                handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -365,12 +368,39 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
                     Integer maxUnackedNum) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex)
-                        -> internalHandleResult(asyncResponse, res, ex, "Failed set MaxUnackedMessagesOnConsumer"));
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setMaxUnackedMessagesOnConsumer", ex, asyncResponse);
+                return null;
+            });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
+    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), })
+    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("deleteMaxUnackedMessagesOnConsumer", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -380,17 +410,21 @@
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
             @ApiResponse(code = 500, message = "Internal server error"), })
     public void getDeduplicationSnapshotInterval(@Suspended final AsyncResponse asyncResponse,
-                                                    @PathParam("tenant") String tenant,
-                                                    @PathParam("namespace") String namespace,
-                                                    @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-        if (topicPolicies.isDeduplicationSnapshotIntervalSecondsSet()) {
-            asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
-        } else {
-            asyncResponse.resume(Response.noContent().build());
-        }
+        preValidation(authoritative)
+            .thenRun(() -> {
+                TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+                asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getDeduplicationSnapshotInterval", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -404,14 +438,17 @@
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Interval to take deduplication snapshot for the specified topic")
-                    Integer interval) {
+                    Integer interval,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        internalSetDeduplicationSnapshotInterval(interval).whenComplete((res, ex)
-                -> internalHandleResult(asyncResponse, res, ex, "Failed set deduplicationSnapshotInterval"));
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setDeduplicationSnapshotInterval", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -420,35 +457,19 @@
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), })
     public void deleteDeduplicationSnapshotInterval(@Suspended final AsyncResponse asyncResponse,
-                                                   @PathParam("tenant") String tenant,
-                                                   @PathParam("namespace") String namespace,
-                                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetDeduplicationSnapshotInterval(null).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed delete deduplicationSnapshotInterval", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed delete deduplicationSnapshotInterval", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(Response.noContent().build());
-            }
-        });
-    }
-
-    @DELETE
-    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
-    @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), })
-    public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
-                                                       @PathParam("tenant") String tenant,
-                                                       @PathParam("namespace") String namespace,
-                                                       @PathParam("topic") @Encoded String encodedTopic) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("deleteDeduplicationSnapshotInterval", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -458,14 +479,19 @@
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
             @ApiResponse(code = 500, message = "Internal server error"), })
     public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
-                                         @PathParam("tenant") String tenant,
-                                         @PathParam("namespace") String namespace,
-                                         @PathParam("topic") @Encoded String encodedTopic,
-                                         @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetInactiveTopicPolicies(applied).whenComplete((res, ex)
-                        -> internalHandleResult(asyncResponse, res, ex, "Failed get InactiveTopicPolicies"));
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetInactiveTopicPolicies(applied))
+            .thenApply(asyncResponse::resume).exceptionally(ex -> {
+                handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -474,15 +500,21 @@
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), })
     public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
-                                                @PathParam("tenant") String tenant,
-                                                @PathParam("namespace") String namespace,
-                                                @PathParam("topic") @Encoded String encodedTopic,
-                                                @ApiParam(value = "inactive topic policies for the specified topic")
-                                                        InactiveTopicPolicies inactiveTopicPolicies) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "inactive topic policies for the specified topic")
+            InactiveTopicPolicies inactiveTopicPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetInactiveTopicPolicies(inactiveTopicPolicies).whenComplete((res, ex)
-                -> internalHandleResult(asyncResponse, res, ex, "Failed set InactiveTopicPolicies"));
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setInactiveTopicPolicies", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -491,11 +523,19 @@
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), })
     public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
-                                                       @PathParam("tenant") String tenant,
-                                                       @PathParam("namespace") String namespace,
-                                                       @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        setInactiveTopicPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetInactiveTopicPolicies(null))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("deleteInactiveTopicPolicies", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -505,14 +545,20 @@
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
             @ApiResponse(code = 500, message = "Internal server error"), })
     public void getMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
-                                                    @PathParam("tenant") String tenant,
-                                                    @PathParam("namespace") String namespace,
-                                                    @PathParam("topic") @Encoded String encodedTopic,
-                                                    @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetMaxUnackedMessagesOnSubscription(applied).whenComplete((res, ex)
-                -> internalHandleResult(asyncResponse, res, ex, "Failed get maxUnackedMessagesOnSubscription"));
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getMaxUnackedMessagesOnSubscription", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -525,14 +571,19 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Max unacked messages on subscription policies for the specified topic")
                     Integer maxUnackedNum) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
         validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-        internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum).whenComplete((res, ex)
-                -> internalHandleResult(asyncResponse, res, ex, "Failed set MaxUnackedMessagesOnSubscription"));
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setMaxUnackedMessagesOnSubscription", ex, asyncResponse);
+                return null;
+            });
     }
 
 
@@ -543,11 +594,21 @@
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), })
     public void deleteMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
-                                                       @PathParam("tenant") String tenant,
-                                                       @PathParam("namespace") String namespace,
-                                                       @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        setMaxUnackedMessagesOnSubscription(asyncResponse, tenant, namespace, encodedTopic, null);
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(null))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("deleteMaxUnackedMessagesOnSubscription", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -557,23 +618,20 @@
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
             @ApiResponse(code = 500, message = "Internal server error"), })
     public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
-                                           @PathParam("tenant") String tenant,
-                                           @PathParam("namespace") String namespace,
-                                           @PathParam("topic") @Encoded String encodedTopic,
-                                           @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetDelayedDeliveryPolicies(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get DelayedDeliveryPolicies", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get DelayedDeliveryPolicies", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getDelayedDeliveryPolicies", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -586,13 +644,20 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Delayed delivery policies for the specified topic")
                     DelayedDeliveryPolicies deliveryPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
-        internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies);
+        validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetDelayedDeliveryPolicies(deliveryPolicies))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setDelayedDeliveryPolicies", ex, asyncResponse);
+                return null;
+            });
     }
 
 
@@ -603,11 +668,21 @@
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), })
     public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
-                                              @PathParam("tenant") String tenant,
-                                              @PathParam("namespace") String namespace,
-                                              @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        setDelayedDeliveryPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
+        validatePoliciesReadOnlyAccess();
+        validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetDelayedDeliveryPolicies(null))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("deleteDelayedDeliveryPolicies", ex, asyncResponse);
+                return null;
+            });
     }
 
     /**
@@ -1467,13 +1542,21 @@
             @ApiResponse(code = 404, message = "Topic policy does not exist"),
             @ApiResponse(code = 405,
                     message = "Topic level policy is disabled, to enable the topic level policy and retry")})
-    public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("tenant") String tenant,
-                                                                  @PathParam("namespace") String namespace,
-                                                                  @PathParam("topic") @Encoded String encodedTopic,
-                                                                  @QueryParam("applied") boolean applied) {
+    public void getBacklogQuotaMap(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        return internalGetBacklogQuota(applied);
+        preValidation(authoritative)
+            .thenAccept(__ -> asyncResponse.resume(internalGetBacklogQuota(applied)))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getBacklogQuotaMap", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -1490,10 +1573,17 @@
             @Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType, BacklogQuotaImpl backlogQuota) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType, backlogQuota))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setBacklogQuota", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -1505,12 +1595,19 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic,
-                                   @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType) {
+            @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType, null))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeBacklogQuota", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -1520,22 +1617,31 @@
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 405, message =
                     "Topic level policy is disabled, enable the topic level policy and retry")})
-    public Integer getMessageTTL(@PathParam("tenant") String tenant,
-                             @PathParam("namespace") String namespace,
-                             @PathParam("topic") @Encoded String encodedTopic,
-                             @QueryParam("applied") boolean applied) {
+    public void getMessageTTL(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        return getTopicPolicies(topicName)
-                .map(TopicPolicies::getMessageTTLInSeconds)
-                .orElseGet(() -> {
-                    if (applied) {
-                        Integer otherLevelTTL = getNamespacePolicies(namespaceName).message_ttl_in_seconds;
-                        return otherLevelTTL == null ? pulsar().getConfiguration().getTtlDurationDefaultInSeconds()
-                                : otherLevelTTL;
-                    }
-                    return null;
-                });
+        preValidation(authoritative)
+            .thenAccept(__ ->
+                asyncResponse.resume(getTopicPolicies(topicName)
+                    .map(TopicPolicies::getMessageTTLInSeconds)
+                    .orElseGet(() -> {
+                        if (applied) {
+                            Integer otherLevelTTL = getNamespacePolicies(namespaceName).message_ttl_in_seconds;
+                            return otherLevelTTL == null ? pulsar().getConfiguration().getTtlDurationDefaultInSeconds()
+                                    : otherLevelTTL;
+                        }
+                        return null;
+                    }))
+            )
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getMessageTTL", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -1548,14 +1654,21 @@
                     "Topic level policy is disabled, enable the topic level policy and retry"),
             @ApiResponse(code = 412, message = "Invalid message TTL value")})
     public void setMessageTTL(@Suspended final AsyncResponse asyncResponse,
-                              @PathParam("tenant") String tenant,
-                              @PathParam("namespace") String namespace,
-                              @PathParam("topic") @Encoded String encodedTopic,
-                              @ApiParam(value = "TTL in seconds for the specified namespace", required = true)
-                              @QueryParam("messageTTL") int messageTTL) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "TTL in seconds for the specified namespace", required = true)
+            @QueryParam("messageTTL") Integer messageTTL,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMessageTTL(asyncResponse, messageTTL);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMessageTTL(messageTTL))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setMessageTTL", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -1569,12 +1682,19 @@
                     message = "Topic level policy is disabled, enable the topic level policy and retry"),
             @ApiResponse(code = 412, message = "Invalid message TTL value")})
     public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
-                              @PathParam("tenant") String tenant,
-                              @PathParam("namespace") String namespace,
-                              @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMessageTTL(asyncResponse, null);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMessageTTL(null))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeMessageTTL", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -1586,23 +1706,20 @@
             @ApiResponse(code = 405,
                     message = "Topic level policy is disabled, to enable the topic level policy and retry")})
     public void getDeduplication(@Suspended final AsyncResponse asyncResponse,
-                             @PathParam("tenant") String tenant,
-                             @PathParam("namespace") String namespace,
-                             @PathParam("topic") @Encoded String encodedTopic,
-                             @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetDeduplication(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get Deduplication", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get Deduplication", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetDeduplication(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getDeduplication", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -1617,21 +1734,18 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "DeduplicationEnabled policies for the specified topic")
                     Boolean enabled) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetDeduplication(enabled).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed updated deduplication", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed updated deduplication", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(Response.noContent().build());
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetDeduplication(enabled))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setDeduplication", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -1643,11 +1757,19 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeDeduplication(@Suspended final AsyncResponse asyncResponse,
-                                           @PathParam("tenant") String tenant,
-                                           @PathParam("namespace") String namespace,
-                                           @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        setDeduplication(asyncResponse, tenant, namespace, encodedTopic, null);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetDeduplication(null))
+            .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setDeduplication", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -1662,16 +1784,16 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("applied") boolean applied) {
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        try {
-            internalGetRetention(asyncResponse, applied);
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        preValidation(authoritative)
+            .thenRun(() -> asyncResponse.resume(internalGetRetention(applied)))
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getRetention", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -1687,17 +1809,13 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetRetention(retention).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed updated retention", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed updated retention", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetRetention(retention))
+            .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully updated retention: namespace={}, topic={}, retention={}",
                             clientAppId(),
@@ -1707,8 +1825,11 @@
                 } catch (JsonProcessingException ignore) {
                 }
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setRetention", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -1721,23 +1842,25 @@
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota")})
     public void removeRetention(@Suspended final AsyncResponse asyncResponse,
-                             @PathParam("tenant") String tenant,
-                             @PathParam("namespace") String namespace,
-                             @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemoveRetention().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed updated retention", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemoveRetention())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove retention: namespace={}, topic={}",
                         clientAppId(),
                         namespaceName,
                         topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setRetention", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -1749,23 +1872,20 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void getPersistence(@Suspended final AsyncResponse asyncResponse,
-                               @PathParam("tenant") String tenant,
-                               @PathParam("namespace") String namespace,
-                               @PathParam("topic") @Encoded String encodedTopic,
-                               @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetPersistence(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get persistence policies", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get persistence policies", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetPersistence(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getPersistence", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -1778,21 +1898,17 @@
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 400, message = "Invalid persistence policies")})
     public void setPersistence(@Suspended final AsyncResponse asyncResponse,
-                               @PathParam("tenant") String tenant,
-                               @PathParam("namespace") String namespace,
-                               @PathParam("topic") @Encoded String encodedTopic,
-                               @ApiParam(value = "Bookkeeper persistence policies for specified topic")
-                                       PersistencePolicies persistencePolicies) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Bookkeeper persistence policies for specified topic")
+            PersistencePolicies persistencePolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetPersistence(persistencePolicies).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed updated persistence policies", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed updated persistence policies", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetPersistence(persistencePolicies))
+            .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully updated persistence policies: "
                                     + "namespace={}, topic={}, persistencePolicies={}",
@@ -1803,8 +1919,11 @@
                 } catch (JsonProcessingException ignore) {
                 }
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setPersistence", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -1816,23 +1935,25 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removePersistence(@Suspended final AsyncResponse asyncResponse,
-                                  @PathParam("tenant") String tenant,
-                                  @PathParam("namespace") String namespace,
-                                  @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemovePersistence().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed updated retention", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemovePersistence())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}",
                         clientAppId(),
                         namespaceName,
                         topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removePersistence", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -1844,23 +1965,22 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        try {
-            Optional<Integer> maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic();
-            if (!maxSubscriptionsPerTopic.isPresent()) {
-                asyncResponse.resume(Response.noContent().build());
-            } else {
-                asyncResponse.resume(maxSubscriptionsPerTopic.get());
-            }
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        preValidation(authoritative)
+            .thenRun(() -> {
+                Optional<Integer> maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic();
+                asyncResponse.resume(maxSubscriptionsPerTopic.isPresent() ? maxSubscriptionsPerTopic.get()
+                        : Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getMaxSubscriptions", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -1873,26 +1993,25 @@
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 412, message = "Invalid value of maxSubscriptionsPerTopic")})
     public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic,
-                                @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Updating maxSubscriptionsPerTopic failed", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Updating maxSubscriptionsPerTopic failed", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic))
+            .thenRun(() -> {
                 log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}"
                                 + ", maxSubscriptions={}"
                         , clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic);
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setMaxSubscriptions", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -1904,23 +2023,23 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant,
-                                   @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMaxSubscriptionsPerTopic(null).whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove maxSubscriptionsPerTopic", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                log.info("[{}] Successfully remove maximum subscription limit: namespace={}, topic={}",
-                        clientAppId(),
-                        namespaceName,
-                        topicName.getLocalName());
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null))
+            .thenRun(() -> {
+                log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}",
+                        clientAppId(), namespaceName, topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeMaxSubscriptions", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -1932,23 +2051,20 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
-                                          @PathParam("tenant") String tenant,
-                                          @PathParam("namespace") String namespace,
-                                          @PathParam("topic") @Encoded String encodedTopic,
-                                          @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetReplicatorDispatchRate(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get replicator dispatchRate", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get replicator dispatchRate", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetReplicatorDispatchRate(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getReplicatorDispatchRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -1961,27 +2077,25 @@
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 412, message = "Invalid value of replicatorDispatchRate")})
     public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
-                                          @PathParam("tenant") String tenant,
-                                          @PathParam("namespace") String namespace,
-                                          @PathParam("topic") @Encoded String encodedTopic,
-                                          @ApiParam(value = "Replicator dispatch rate of the topic")
-                                                  DispatchRateImpl dispatchRate) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetReplicatorDispatchRate(dispatchRate).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Updating replicatorDispatchRate failed", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Updating replicatorDispatchRate failed", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate))
+            .thenRun(() -> {
                 log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
-                                + ", replicatorDispatchRate={}"
-                        , clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate);
+                                + ", replicatorDispatchRate={}",
+                        clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate);
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setReplicatorDispatchRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -1993,21 +2107,23 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
-                                               @PathParam("tenant") String tenant,
-                                               @PathParam("namespace") String namespace,
-                                               @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetReplicatorDispatchRate(null).whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove replicatorDispatchRate", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetReplicatorDispatchRate(null))
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
                         clientAppId(), namespaceName, topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeReplicatorDispatchRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -2019,23 +2135,20 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void getMaxProducers(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic,
-                                @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetMaxProducers(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get maxProducers", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get maxProducers", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetMaxProducers(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getMaxProducers", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -2048,28 +2161,27 @@
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 412, message = "Invalid value of maxProducers")})
     public void setMaxProducers(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic,
-                                @ApiParam(value = "The max producers of the topic") int maxProducers) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "The max producers of the topic") int maxProducers) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMaxProducers(maxProducers).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed updated persistence policies", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed updated persistence policies", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxProducers(maxProducers))
+            .thenRun(() -> {
                 log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}",
                         clientAppId(),
                         namespaceName,
                         topicName.getLocalName(),
                         maxProducers);
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setMaxProducers", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -2081,23 +2193,25 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeMaxProducers(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant,
-                                   @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemoveMaxProducers().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove maxProducers", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemoveMaxProducers())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove max producers: namespace={}, topic={}",
                         clientAppId(),
                         namespaceName,
                         topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeMaxProducers", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -2109,23 +2223,20 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void getMaxConsumers(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic,
-                                @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetMaxConsumers(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get maxConsumers", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get maxConsumers", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetMaxConsumers(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getMaxConsumers", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -2138,28 +2249,27 @@
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 412, message = "Invalid value of maxConsumers")})
     public void setMaxConsumers(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic,
-                                @ApiParam(value = "The max consumers of the topic") int maxConsumers) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "The max consumers of the topic") int maxConsumers) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMaxConsumers(maxConsumers).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed updated persistence policies", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed updated persistence policies", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxConsumers(maxConsumers))
+            .thenRun(() -> {
                 log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}",
                         clientAppId(),
                         namespaceName,
                         topicName.getLocalName(),
                         maxConsumers);
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setMaxConsumers", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -2171,23 +2281,25 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeMaxConsumers(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant,
-                                   @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemoveMaxConsumers().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove maxConsumers", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemoveMaxConsumers())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove max consumers: namespace={}, topic={}",
                         clientAppId(),
                         namespaceName,
                         topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeMaxConsumers", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -2199,23 +2311,21 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
-                                  @PathParam("tenant") String tenant,
-                                  @PathParam("namespace") String namespace,
-                                  @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        try {
-            Optional<Integer> policies = internalGetMaxMessageSize();
-            if (policies.isPresent()) {
-                asyncResponse.resume(policies.get());
-            } else {
-                asyncResponse.resume(Response.noContent().build());
-            }
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        preValidation(authoritative)
+            .thenRun(() -> {
+                Optional<Integer> policies = internalGetMaxMessageSize();
+                asyncResponse.resume(policies.isPresent() ? policies.get() : Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getMaxMessageSize", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -2228,28 +2338,27 @@
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 412, message = "Invalid value of maxConsumers")})
     public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
-                                  @PathParam("tenant") String tenant,
-                                  @PathParam("namespace") String namespace,
-                                  @PathParam("topic") @Encoded String encodedTopic,
-                                  @ApiParam(value = "The max message size of the topic") int maxMessageSize) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "The max message size of the topic") int maxMessageSize) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMaxMessageSize(maxMessageSize).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed updated persistence policies", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed updated persistence policies", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxMessageSize(maxMessageSize))
+            .thenRun(() -> {
                 log.info("[{}] Successfully set max message size: namespace={}, topic={}, maxMessageSiz={}",
                         clientAppId(),
                         namespaceName,
                         topicName.getLocalName(),
                         maxMessageSize);
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setMaxMessageSize", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -2261,23 +2370,25 @@
                     message = "Topic level policy is disabled, to enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant,
-                                   @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMaxMessageSize(null).whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove maxMessageSize", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxMessageSize(null))
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove max message size: namespace={}, topic={}",
                         clientAppId(),
                         namespaceName,
                         topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeMaxMessageSize", ex, asyncResponse);
+                return null;
+            });
     }
 
 
@@ -2495,20 +2606,17 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("applied") boolean applied) {
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetDispatchRate(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get dispatchRate", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get dispatchRate", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetDispatchRate(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getDispatchRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -2520,21 +2628,16 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void setDispatchRate(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic,
-                                @ApiParam(value = "Dispatch rate for the specified topic")
-                                            DispatchRateImpl dispatchRate) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Dispatch rate for the specified topic") DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetDispatchRate(dispatchRate).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed to set topic dispatch rate", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed to set topic dispatch rate");
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetDispatchRate(dispatchRate))
+            .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic dispatch rate:"
                                     + " tenant={}, namespace={}, topic={}, dispatchRate={}",
@@ -2545,8 +2648,11 @@
                             jsonMapper().writeValueAsString(dispatchRate));
                 } catch (JsonProcessingException ignore) {}
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setDispatchRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -2558,24 +2664,26 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant,
-                                   @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemoveDispatchRate().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove topic dispatch rate", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemoveDispatchRate())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}",
-                    clientAppId(),
-                    tenant,
-                    namespace,
-                    topicName.getLocalName());
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeDispatchRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -2590,20 +2698,17 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("applied") boolean applied) {
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetSubscriptionDispatchRate(applied).whenComplete((res, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed get subscription dispatchRate", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed get subscription dispatchRate", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                asyncResponse.resume(res);
-            }
-        });
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetSubscriptionDispatchRate(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getSubscriptionDispatchRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -2619,18 +2724,14 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Subscription message dispatch rate for the specified topic")
                     DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName());
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate))
+            .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic subscription dispatch rate:"
                                     + " tenant={}, namespace={}, topic={}, dispatchRate={}",
@@ -2641,8 +2742,11 @@
                             jsonMapper().writeValueAsString(dispatchRate));
                 } catch (JsonProcessingException ignore) {}
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setSubscriptionDispatchRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -2654,24 +2758,26 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant,
-                                   @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemoveSubscriptionDispatchRate())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
-                    clientAppId(),
-                    tenant,
-                    namespace,
-                    topicName.getLocalName());
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeSubscriptionDispatchRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -2683,14 +2789,20 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
-                                       @PathParam("tenant") String tenant,
-                                       @PathParam("namespace") String namespace,
-                                       @PathParam("topic") @Encoded String encodedTopic,
-                                       @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetCompactionThreshold(applied).whenComplete((res, ex)
-                -> internalHandleResult(asyncResponse, res, ex, "Failed get compaction threshold"));
+        preValidation(authoritative)
+            .thenCompose(__ -> internalGetCompactionThreshold(applied))
+            .thenApply(asyncResponse::resume)
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getCompactionThreshold", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -2702,20 +2814,16 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic,
-                                @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetCompactionThreshold(compactionThreshold).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed to set topic dispatch rate", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed to set topic dispatch rate");
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold))
+            .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic compaction threshold:"
                                     + " tenant={}, namespace={}, topic={}, compactionThreshold={}",
@@ -2726,8 +2834,11 @@
                             jsonMapper().writeValueAsString(compactionThreshold));
                 } catch (JsonProcessingException ignore) {}
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setCompactionThreshold", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -2739,24 +2850,26 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant,
-                                   @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemoveCompactionThreshold().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove topic dispatch rate", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemoveCompactionThreshold())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
-                    clientAppId(),
-                    tenant,
-                    namespace,
-                    topicName.getLocalName());
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeCompactionThreshold", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -2768,23 +2881,22 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void getMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
-                                       @PathParam("tenant") String tenant,
-                                       @PathParam("namespace") String namespace,
-                                       @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        try {
-            Optional<Integer> maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription();
-            if (!maxConsumersPerSubscription.isPresent()) {
-                asyncResponse.resume(Response.noContent().build());
-            } else {
-                asyncResponse.resume(maxConsumersPerSubscription.get());
-            }
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        preValidation(authoritative)
+            .thenRun(() -> {
+                Optional<Integer> maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription();
+                asyncResponse.resume(maxConsumersPerSubscription.isPresent() ? maxConsumersPerSubscription.get()
+                        : Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getMaxConsumersPerSubscription", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -2800,17 +2912,13 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed to set topic {} max consumers per subscription ", topicName.getLocalName(), ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed to set topic max consumers per subscription");
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription))
+            .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic max consumers per subscription:"
                                     + " tenant={}, namespace={}, topic={}, maxConsumersPerSubscription={}",
@@ -2821,8 +2929,11 @@
                             jsonMapper().writeValueAsString(maxConsumersPerSubscription));
                 } catch (JsonProcessingException ignore) {}
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setMaxConsumersPerSubscription", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -2834,16 +2945,15 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
-                                          @PathParam("tenant") String tenant,
-                                          @PathParam("namespace") String namespace,
-                                          @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemoveMaxConsumersPerSubscription().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove topic {} max consuners per subscription", topicName.getLocalName(), ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemoveMaxConsumersPerSubscription())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove topic max consumers per subscription:"
                                 + " tenant={}, namespace={}, topic={}",
                         clientAppId(),
@@ -2851,8 +2961,11 @@
                         namespace,
                         topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeMaxConsumersPerSubscription", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -2866,21 +2979,20 @@
     public void getPublishRate(@Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        try {
-            Optional<PublishRate> publishRate = internalGetPublishRate();
-            if (!publishRate.isPresent()) {
-                asyncResponse.resume(Response.noContent().build());
-            } else {
-                asyncResponse.resume(publishRate.get());
-            }
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        preValidation(authoritative)
+            .thenRun(() -> {
+                Optional<PublishRate> publishRate = internalGetPublishRate();
+                asyncResponse.resume(publishRate.isPresent() ? publishRate.get()
+                        : Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getPublishRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @POST
@@ -2892,20 +3004,16 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void setPublishRate(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic,
-                                @ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetPublishRate(publishRate).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed to set topic dispatch rate", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed to set topic dispatch rate");
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetPublishRate(publishRate))
+            .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic publish rate:"
                                     + " tenant={}, namespace={}, topic={}, publishRate={}",
@@ -2916,77 +3024,11 @@
                             jsonMapper().writeValueAsString(publishRate));
                 } catch (JsonProcessingException ignore) {}
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
-    }
-
-    @GET
-    @Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
-    @ApiOperation(value = "Get is enable sub type fors specified topic.")
-    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Topic does not exist"),
-            @ApiResponse(code = 405,
-                    message = "Topic level policy is disabled, please enable the topic level policy and retry"),
-            @ApiResponse(code = 409, message = "Concurrent modification")})
-    public void getSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResponse,
-                                            @PathParam("tenant") String tenant,
-                                            @PathParam("namespace") String namespace,
-                                            @PathParam("topic") @Encoded String encodedTopic) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        try {
-            Optional<List<SubType>> subscriptionTypesEnabled = internalGetSubscriptionTypesEnabled();
-            if (!subscriptionTypesEnabled.isPresent()) {
-                asyncResponse.resume(Response.noContent().build());
-            } else {
-                Set<SubscriptionType> subscriptionTypes = new HashSet<>();
-                subscriptionTypesEnabled.get().forEach(subType ->
-                        subscriptionTypes.add(SubscriptionType.valueOf(subType.name())));
-                asyncResponse.resume(subscriptionTypes);
-            }
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
-    }
-
-    @POST
-    @Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
-    @ApiOperation(value = "Set is enable sub types for specified topic")
-    @ApiResponses(value = {@ApiResponse(code = 403, message = "Topic does not exist"),
-            @ApiResponse(code = 404, message = "Topic does not exist"),
-            @ApiResponse(code = 405,
-                    message = "Topic level policy is disabled, please enable the topic level policy and retry"),
-            @ApiResponse(code = 409, message = "Concurrent modification")})
-    public void setSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResponse,
-                                            @PathParam("tenant") String tenant,
-                                            @PathParam("namespace") String namespace,
-                                            @PathParam("topic") @Encoded String encodedTopic,
-                                            @ApiParam(value = "Enable sub types for the specified topic")
-                                                        Set<SubscriptionType> subscriptionTypesEnabled) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed to set topic is enable sub types", ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed to set topic is enable sub types", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
-                try {
-                    log.info("[{}] Successfully set topic is enabled sub types :"
-                                    + " tenant={}, namespace={}, topic={}, subscriptionTypesEnabled={}",
-                            clientAppId(),
-                            tenant,
-                            namespace,
-                            topicName.getLocalName(),
-                            jsonMapper().writeValueAsString(subscriptionTypesEnabled));
-                } catch (JsonProcessingException ignore) {}
-                asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setPublishRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -2998,24 +3040,90 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removePublishRate(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant,
-                                   @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemovePublishRate().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove topic publish rate", ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemovePublishRate())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}",
-                    clientAppId(),
-                    tenant,
-                    namespace,
-                    topicName.getLocalName());
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removePublishRate", ex, asyncResponse);
+                return null;
+            });
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
+    @ApiOperation(value = "Get is enable sub type fors specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+            .thenRun(() -> {
+                Optional<List<SubType>> subscriptionTypesEnabled = internalGetSubscriptionTypesEnabled();
+                asyncResponse.resume(subscriptionTypesEnabled.isPresent() ? subscriptionTypesEnabled.get()
+                        : Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("getSubscriptionTypesEnabled", ex, asyncResponse);
+                return null;
+            });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
+    @ApiOperation(value = "Set is enable sub types for specified topic")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Topic does not exist"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Enable sub types for the specified topic")
+            Set<SubscriptionType> subscriptionTypesEnabled) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled))
+            .thenRun(() -> {
+                try {
+                    log.info("[{}] Successfully set topic is enabled sub types :"
+                                    + " tenant={}, namespace={}, topic={}, subscriptionTypesEnabled={}",
+                            clientAppId(),
+                            tenant,
+                            namespace,
+                            topicName.getLocalName(),
+                            jsonMapper().writeValueAsString(subscriptionTypesEnabled));
+                } catch (JsonProcessingException ignore) {}
+                asyncResponse.resume(Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setSubscriptionTypesEnabled", ex, asyncResponse);
+                return null;
+            });
     }
 
     @GET
@@ -3027,14 +3135,18 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse,
-                                @PathParam("tenant") String tenant,
-                                @PathParam("namespace") String namespace,
-                                @PathParam("topic") @Encoded String encodedTopic,
-                                @QueryParam("applied") boolean applied) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") boolean applied,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalGetSubscribeRate(applied).whenComplete((res, ex) -> {
-            internalHandleResult(asyncResponse, res, ex, "Failed get subscribe rate");
+        preValidation(authoritative)
+                .thenCompose(__ -> internalGetSubscribeRate(applied))
+                .thenApply(asyncResponse::resume).exceptionally(ex -> {
+            handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
+            return null;
         });
     }
 
@@ -3051,17 +3163,13 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalSetSubscribeRate(subscribeRate).whenComplete((r, ex) -> {
-            if (ex instanceof RestException) {
-                log.error("Failed to set topic {} subscribe rate", topicName.getLocalName(), ex);
-                asyncResponse.resume(ex);
-            } else if (ex != null) {
-                log.error("Failed to set topic subscribe rate");
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalSetSubscribeRate(subscribeRate))
+            .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic subscribe rate:"
                                     + " tenant={}, namespace={}, topic={}, subscribeRate={}",
@@ -3072,8 +3180,11 @@
                             jsonMapper().writeValueAsString(subscribeRate));
                 } catch (JsonProcessingException ignore) {}
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("setSubscribeRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
@@ -3085,24 +3196,26 @@
                     message = "Topic level policy is disabled, please enable the topic level policy and retry"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
     public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse,
-                                   @PathParam("tenant") String tenant,
-                                   @PathParam("namespace") String namespace,
-                                   @PathParam("topic") @Encoded String encodedTopic) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation();
-        internalRemoveSubscribeRate().whenComplete((r, ex) -> {
-            if (ex != null) {
-                log.error("Failed to remove topic {} subscribe rate ", topicName.getLocalName(), ex);
-                asyncResponse.resume(new RestException(ex));
-            } else {
+        preValidation(authoritative)
+            .thenCompose(__ -> internalRemoveSubscribeRate())
+            .thenRun(() -> {
                 log.info("[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}",
                         clientAppId(),
                         tenant,
                         namespace,
                         topicName.getLocalName());
                 asyncResponse.resume(Response.noContent().build());
-            }
-        });
+            })
+            .exceptionally(ex -> {
+                handleTopicPolicyException("removeSubscribeRate", ex, asyncResponse);
+                return null;
+            });
     }
 
     @DELETE
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 807da68..87ff3b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -390,7 +390,6 @@
             //change persistent://tenant/namespace/xxx-partition-0  to persistent://tenant/namespace/xxx
             realTopicName = TopicName.get(topicName.getPartitionedTopicName());
         }
-        policiesCache.remove(realTopicName);
         listeners.remove(realTopicName);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 39289a4..e0bb3ab 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -284,8 +284,6 @@
         int partitionNum = 3;
         admin.topics().createPartitionedTopic(topic, partitionNum);
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();
-        TopicName topicName = TopicName.get(topic);
-        Awaitility.await().until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(topicName));
 
         setTopicPoliciesAndValidate(admin2, admin3, topic);
         //for non-partitioned topic, we can get topic policies from every broker
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 3d47212..8ccecc6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -591,24 +591,30 @@
 
         Awaitility.await()
                 .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
+        admin.topics().createNonPartitionedTopic(persistenceTopic);
         admin.topics().setPersistence(persistenceTopic, persistencePolicies);
 
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getPersistence(persistenceTopic), persistencePolicies));
-
-        admin.lookups().lookupTopic(persistenceTopic);
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(persistenceTopic)
+                .subscriptionName("test")
+                .subscribe();
+        admin.topics().unload(persistenceTopic);
         Topic t = pulsar.getBrokerService().getOrCreateTopic(persistenceTopic).get();
         PersistentTopic persistentTopic = (PersistentTopic) t;
-        ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
-        assertEquals(managedLedgerConfig.getEnsembleSize(), 3);
-        assertEquals(managedLedgerConfig.getWriteQuorumSize(), 3);
-        assertEquals(managedLedgerConfig.getAckQuorumSize(), 3);
-        assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.1);
+        Awaitility.await().untilAsserted(() -> {
+            ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
+            assertEquals(managedLedgerConfig.getEnsembleSize(), 3);
+            assertEquals(managedLedgerConfig.getWriteQuorumSize(), 3);
+            assertEquals(managedLedgerConfig.getAckQuorumSize(), 3);
+            assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.1);
+        });
 
         PersistencePolicies getPersistencePolicies = admin.topics().getPersistence(persistenceTopic);
         log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic);
         Assert.assertEquals(getPersistencePolicies, persistencePolicies);
+        consumer.close();
     }
 
     @Test
@@ -664,7 +670,7 @@
 
         Awaitility.await()
                 .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
+        admin.topics().createNonPartitionedTopic(persistenceTopic);
         admin.topics().setPersistence(persistenceTopic, persistencePolicies);
 
         Awaitility.await()
@@ -732,12 +738,12 @@
         Awaitility.await()
                 .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
 
+        admin.topics().createPartitionedTopic(persistenceTopic, 2);
         admin.topics().setMaxProducers(persistenceTopic, maxProducers);
 
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxProducers(persistenceTopic), maxProducers));
 
-        admin.topics().createPartitionedTopic(persistenceTopic, 2);
         Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create();
         Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create();
         Producer<byte[]> producer3 = null;
@@ -763,13 +769,12 @@
 
         Awaitility.await()
                 .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
+        admin.topics().createPartitionedTopic(persistenceTopic, 2);
         admin.topics().setMaxProducers(persistenceTopic, maxProducers);
 
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxProducers(persistenceTopic), maxProducers));
 
-        admin.topics().createPartitionedTopic(persistenceTopic, 2);
         Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create();
         Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create();
         Producer<byte[]> producer3 = null;
@@ -1301,13 +1306,12 @@
 
         Awaitility.await()
                 .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
+        admin.topics().createPartitionedTopic(persistenceTopic, 2);
         admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);
 
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumers(persistenceTopic), maxConsumers));
 
-        admin.topics().createPartitionedTopic(persistenceTopic, 2);
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
         Consumer<byte[]> consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
         Consumer<byte[]> consumer3 = null;
@@ -1333,13 +1337,12 @@
         Integer maxConsumers = 2;
         Awaitility.await()
                 .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));
-
+        admin.topics().createPartitionedTopic(persistenceTopic, 2);
         admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);
 
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumers(persistenceTopic), maxConsumers));
 
-        admin.topics().createPartitionedTopic(persistenceTopic, 2);
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
         Consumer<byte[]> consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
         Consumer<byte[]> consumer3 = null;
@@ -1688,8 +1691,8 @@
     public void testTopicMaxMessageSizeApi() throws Exception{
         Awaitility.await()
                 .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic)));
+        admin.topics().createNonPartitionedTopic(persistenceTopic);
         assertNull(admin.topics().getMaxMessageSize(persistenceTopic));
-
         admin.topics().setMaxMessageSize(persistenceTopic,10);
         Awaitility.await().until(()
                 -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic)) != null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index aaa4618..d0a04b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -204,6 +204,7 @@
         assertNotNull(listMap.get(topicName).get(0));
 
         admin.topics().deletePartitionedTopic(topic, true);
+        admin.namespaces().unload(NAMESPACE1);
         assertNull(map.get(topicName));
         assertNull(listMap.get(topicName));
     }