[refactor][broker] Use AuthenticationParameters for rest producer (#20046)
In #19975, we introduced a wrapper for all authentication parameters. This PR adds that wrapper to the Rest Producer.
* Use `AuthenticationParameters` to simplify parameter management in Rest Producer.
* Add method to the `AuthorizationService` that takes the `AuthenticationParameters`.
* Update annotations on Rest Producer to indicate that a 401 is an expected response.
This change is covered by the `TopicsAuthTest`.
- [x] `doc-not-needed`
This is an internal change that does not need to be documented.
PR in forked repository: skipping PR since the relevant tests pass locally
(cherry picked from commit 7990948a73e2c4dfa0e9c99ff223f0ee90e82dc3)
(cherry picked from commit 02b27e8e5aa0280a5b0f8a8cd80409b1cfe8d2db)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index f103c8e..c5b9c18 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -738,6 +738,13 @@
public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
TopicOperation operation,
+ AuthenticationParameters authParams) {
+ return allowTopicOperationAsync(topicName, operation, authParams.getOriginalPrincipal(),
+ authParams.getClientRole(), authParams.getClientAuthenticationDataSource());
+ }
+
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
+ TopicOperation operation,
String originalRole,
String role,
AuthenticationDataSource authData) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
index a8095f03..1a6bb52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
@@ -49,6 +49,7 @@
@Path("/persistent/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Produce message to a persistent topic.", response = String.class, responseContainer = "List")
@ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
@@ -76,6 +77,7 @@
@ApiOperation(value = "Produce message to a partition of a persistent topic.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
@@ -104,6 +106,7 @@
@Path("/non-persistent/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Produce message to a persistent topic.", response = String.class, responseContainer = "List")
@ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
@@ -132,6 +135,7 @@
@ApiOperation(value = "Produce message to a partition of a persistent topic.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 86e8956..14996c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.rest;
+import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.URI;
@@ -53,6 +54,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -80,6 +82,7 @@
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -760,14 +763,24 @@
if (!isClientAuthenticated(clientAppId())) {
throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
}
+ AuthenticationParameters authParams = authParams();
+ boolean isAuthorized;
+ try {
+ isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, authParams)
+ .get(config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Time-out {} sec while checking authorization on {} ",
+ config().getMetadataStoreOperationTimeoutSeconds(), topicName);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, "Time-out while checking authorization");
+ } catch (Exception e) {
+ log.warn("Producer-client with Role - {} {} failed to get permissions for topic - {}. {}",
+ authParams.getClientRole(), authParams.getOriginalPrincipal(), topicName, e.getMessage());
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get permissions");
+ }
- boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
- .canProduce(topicName, originalPrincipal() == null ? clientAppId() : originalPrincipal(),
- clientAuthData());
if (!isAuthorized) {
- throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to produce to topic %s"
- + " with clientAppId [%s] and authdata %s", topicName.toString(),
- clientAppId(), clientAuthData()));
+ throw new RestException(Status.UNAUTHORIZED, "Unauthorized to produce to topic " + topicName);
}
}
}