[improve][broker] Consistently add fine-grain authorization to REST API (#22202)
(cherry picked from commit 68c10925df43769eee7265b4af0ac8ee4913e715)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9094a46..0cb9bc6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -77,6 +77,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
@@ -358,7 +359,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -381,7 +383,8 @@
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenAccept(__ -> validateOffloadPolicies(offloadPolicies))
.thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
@@ -404,7 +407,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -428,7 +432,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse);
@@ -452,7 +457,8 @@
@ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -474,7 +480,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -497,7 +504,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
.thenAccept(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
@@ -525,7 +533,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -547,7 +556,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -571,7 +581,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetInactiveTopicPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
@@ -594,7 +605,8 @@
@ApiParam(value = "inactive topic policies for the specified topic")
InactiveTopicPolicies inactiveTopicPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -616,7 +628,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetInactiveTopicPolicies(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -640,7 +653,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -715,7 +729,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -1931,16 +1946,17 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalExamineMessageAsync(initialPosition, messagePosition, authoritative)
- .thenAccept(asyncResponse::resume)
- .exceptionally(ex -> {
- if (isNot307And404Exception(ex)) {
- log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName,
- ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
+ validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)
+ .thenCompose(__ -> internalExamineMessageAsync(initialPosition, messagePosition, authoritative))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName,
+ ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -2047,7 +2063,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalGetBacklogAsync(authoritative)
+ validateTopicOperationAsync(topicName, TopicOperation.GET_BACKLOG_SIZE)
+ .thenCompose(__ -> internalGetBacklogAsync(authoritative))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
Throwable t = FutureUtil.unwrapCompletionException(ex);
@@ -2103,7 +2120,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetBacklogQuota(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2186,7 +2204,8 @@
+ "For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenAccept(op -> {
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(() -> {
@@ -2267,7 +2286,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
.thenAccept(op -> asyncResponse.resume(op
.map(TopicPolicies::getMessageTTLInSeconds)
@@ -2304,7 +2324,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMessageTTL(messageTTL, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2331,7 +2352,8 @@
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMessageTTL(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2357,7 +2379,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetDeduplication(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2384,7 +2407,8 @@
@ApiParam(value = "DeduplicationEnabled policies for the specified topic")
Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplication(enabled, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2409,7 +2433,8 @@
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplication(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2515,7 +2540,6 @@
return null;
});
}
-
@GET
@Path("/{tenant}/{namespace}/{topic}/persistence")
@ApiOperation(
@@ -2536,7 +2560,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetPersistence(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2564,7 +2589,8 @@
@ApiParam(value = "Bookkeeper persistence policies for specified topic")
PersistencePolicies persistencePolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetPersistence(persistencePolicies, isGlobal))
.thenRun(() -> {
try {
@@ -2600,7 +2626,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemovePersistence(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}",
@@ -2631,7 +2658,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxSubscriptionsPerTopic(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
@@ -2659,7 +2687,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}"
@@ -2689,7 +2718,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}",
@@ -2719,7 +2749,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetReplicatorDispatchRate(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2746,7 +2777,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
@@ -2776,7 +2808,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
@@ -2806,7 +2839,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxProducers(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2833,7 +2867,8 @@
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "The max producers of the topic") int maxProducers) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxProducers(maxProducers, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}",
@@ -2865,7 +2900,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveMaxProducers(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove max producers: namespace={}, topic={}",
@@ -2897,7 +2933,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxConsumers(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2924,7 +2961,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The max consumers of the topic") int maxConsumers) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxConsumers(maxConsumers, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}",
@@ -2956,7 +2994,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveMaxConsumers(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove max consumers: namespace={}, topic={}",
@@ -2987,7 +3026,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxMessageSize(isGlobal))
.thenAccept(policies -> {
asyncResponse.resume(policies.isPresent() ? policies.get() : Response.noContent().build());
@@ -3016,7 +3056,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The max message size of the topic") int maxMessageSize) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxMessageSize(maxMessageSize, isGlobal))
.thenRun(() -> {
log.info(
@@ -3050,7 +3091,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxMessageSize(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove max message size: namespace={}, topic={}",
@@ -3358,7 +3400,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetDispatchRate(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -3384,7 +3427,8 @@
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Dispatch rate for the specified topic") DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
try {
@@ -3420,7 +3464,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveDispatchRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}",
@@ -3456,7 +3501,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscriptionDispatchRate(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -3484,7 +3530,8 @@
@ApiParam(value = "Subscription message dispatch rate for the specified topic")
DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
try {
@@ -3520,7 +3567,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveSubscriptionDispatchRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
@@ -3554,7 +3602,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscriptionLevelDispatchRate(
Codec.decode(encodedSubscriptionName), applied, isGlobal))
.thenApply(asyncResponse::resume)
@@ -3584,7 +3633,8 @@
@ApiParam(value = "Subscription message dispatch rate for the specified topic")
DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSubscriptionLevelDispatchRate(
Codec.decode(encodedSubscriptionName), dispatchRate, isGlobal))
.thenRun(() -> {
@@ -3622,7 +3672,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveSubscriptionLevelDispatchRate(
Codec.decode(encodedSubscriptionName), isGlobal))
.thenRun(() -> {
@@ -3654,7 +3705,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetCompactionThreshold(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -3680,7 +3732,8 @@
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold, isGlobal))
.thenRun(() -> {
try {
@@ -3716,7 +3769,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveCompactionThreshold(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
@@ -3751,7 +3805,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxConsumersPerSubscription(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
@@ -3779,7 +3834,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription, isGlobal))
.thenRun(() -> {
try {
@@ -3815,7 +3871,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveMaxConsumersPerSubscription(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic max consumers per subscription:"
@@ -3848,7 +3905,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetPublishRate(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
@@ -3875,7 +3933,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetPublishRate(publishRate, isGlobal))
.thenRun(() -> {
try {
@@ -3912,7 +3971,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemovePublishRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}",
@@ -3949,7 +4009,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscriptionTypesEnabled(isGlobal))
.thenAccept(op -> {
asyncResponse.resume(op.isPresent() ? op.get()
@@ -3979,7 +4040,8 @@
@ApiParam(value = "Enable sub types for the specified topic")
Set<SubscriptionType> subscriptionTypesEnabled) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled, isGlobal))
.thenRun(() -> {
try {
@@ -4015,7 +4077,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveSubscriptionTypesEnabled(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove subscription types enabled: namespace={}, topic={}",
@@ -4047,7 +4110,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscribeRate(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
@@ -4073,7 +4137,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSubscribeRate(subscribeRate, isGlobal))
.thenRun(() -> {
try {
@@ -4111,7 +4176,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveSubscribeRate(isGlobal))
.thenRun(() -> {
log.info(
@@ -4359,7 +4425,8 @@
+ "broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSchemaValidationEnforced(applied))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
@@ -4386,7 +4453,8 @@
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(required = true) boolean schemaValidationEnforced) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSchemaValidationEnforced(schemaValidationEnforced))
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -4582,7 +4650,8 @@
@ApiParam(value = "Settings for automatic subscription creation")
AutoSubscriptionCreationOverrideImpl autoSubscriptionCreationOverride) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetAutoSubscriptionCreation(autoSubscriptionCreationOverride, isGlobal))
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -4608,7 +4677,8 @@
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetAutoSubscriptionCreation(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getAutoSubscriptionCreation", ex, asyncResponse);
@@ -4633,7 +4703,8 @@
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetAutoSubscriptionCreation(null, isGlobal))
.thenRun(() -> {
log.info(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
index f07b9a6..bcb8e32 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.awaitility.Awaitility.await;
import io.jsonwebtoken.Jwts;
import java.util.Set;
import java.util.UUID;
@@ -27,6 +28,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.security.MockedPulsarStandalone;
@@ -35,8 +38,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.awaitility.Awaitility.await;
-
public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {
@@ -172,4 +173,311 @@
}
}
+ @SneakyThrows
+ @Test
+ public void testOffloadPolicy() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // mocked data
+ final OffloadPoliciesImpl definedOffloadPolicies = new OffloadPoliciesImpl();
+ definedOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(100L);
+ definedOffloadPolicies.setManagedLedgerOffloadThresholdInSeconds(100L);
+ definedOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(200L);
+ definedOffloadPolicies.setManagedLedgerOffloadDriver("s3");
+ definedOffloadPolicies.setManagedLedgerOffloadBucket("buck");
+
+ // test superuser
+ superUserAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
+
+ // because the topic policies is eventual consistency, we should wait here
+ await().untilAsserted(() -> {
+ final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
+ });
+ superUserAdmin.topicPolicies().removeOffloadPolicies(topic);
+
+ await().untilAsserted(() -> {
+ final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.assertNull(offloadPolicy);
+ });
+
+ // test tenant manager
+
+ tenantManagerAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
+ await().untilAsserted(() -> {
+ final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
+ });
+ tenantManagerAdmin.topicPolicies().removeOffloadPolicies(topic);
+ await().untilAsserted(() -> {
+ final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.assertNull(offloadPolicy);
+ });
+
+ // test nobody
+
+ try {
+ subAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeOffloadPolicies(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Set.of(action));
+ try {
+ subAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeOffloadPolicies(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+ superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+ }
+ }
+
+ @SneakyThrows
+ @Test
+ public void testMaxUnackedMessagesOnConsumer() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // mocked data
+ int definedUnackedMessagesOnConsumer = 100;
+
+ // test superuser
+ superUserAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+
+ // because the topic policies is eventual consistency, we should wait here
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+ });
+ superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test tenant manager
+
+ tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+ });
+ tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test nobody
+
+ try {
+ subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Set.of(action));
+ try {
+ subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+ superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+ }
+ }
+
+ @SneakyThrows
+ @Test
+ public void testMaxUnackedMessagesOnSubscription() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // mocked data
+ int definedUnackedMessagesOnConsumer = 100;
+
+ // test superuser
+ superUserAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+
+ // because the topic policies is eventual consistency, we should wait here
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+ });
+ superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test tenant manager
+
+ tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+ });
+ tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test nobody
+
+ try {
+ subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Set.of(action));
+ try {
+ subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+ superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+ }
+
+ }
}