[refactor][broker] Use AuthenticationParameters for rest producer (#20046)

In #19975, we introduced a wrapper for all authentication parameters. This PR adds that wrapper to the Rest Producer.

* Use `AuthenticationParameters` to simplify parameter management in Rest Producer.
* Add method to the `AuthorizationService` that takes the `AuthenticationParameters`.
* Update annotations on Rest Producer to indicate that a 401 is an expected response.

This change is covered by the `TopicsAuthTest`.

- [x] `doc-not-needed`

This is an internal change that does not need to be documented.

PR in forked repository: skipping PR since the relevant tests pass locally

(cherry picked from commit 7990948a73e2c4dfa0e9c99ff223f0ee90e82dc3)
(cherry picked from commit 02b27e8e5aa0280a5b0f8a8cd80409b1cfe8d2db)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index f103c8e..c5b9c18 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -738,6 +738,13 @@
 
     public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
                                                                TopicOperation operation,
+                                                               AuthenticationParameters authParams) {
+        return allowTopicOperationAsync(topicName, operation, authParams.getOriginalPrincipal(),
+                authParams.getClientRole(), authParams.getClientAuthenticationDataSource());
+    }
+
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
+                                                               TopicOperation operation,
                                                                String originalRole,
                                                                String role,
                                                                AuthenticationDataSource authData) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
index a8095f03..1a6bb52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
@@ -49,6 +49,7 @@
     @Path("/persistent/{tenant}/{namespace}/{topic}")
     @ApiOperation(value = "Produce message to a persistent topic.", response = String.class, responseContainer = "List")
     @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
@@ -76,6 +77,7 @@
     @ApiOperation(value = "Produce message to a partition of a persistent topic.",
             response = String.class, responseContainer = "List")
     @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
@@ -104,6 +106,7 @@
     @Path("/non-persistent/{tenant}/{namespace}/{topic}")
     @ApiOperation(value = "Produce message to a persistent topic.", response = String.class, responseContainer = "List")
     @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
@@ -132,6 +135,7 @@
     @ApiOperation(value = "Produce message to a partition of a persistent topic.",
             response = String.class, responseContainer = "List")
     @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 86e8956..14996c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.rest;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.net.URI;
@@ -53,6 +54,7 @@
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.authentication.AuthenticationParameters;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -80,6 +82,7 @@
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -760,14 +763,24 @@
             if (!isClientAuthenticated(clientAppId())) {
                 throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
             }
+            AuthenticationParameters authParams = authParams();
+            boolean isAuthorized;
+            try {
+                isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+                        .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, authParams)
+                        .get(config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+            } catch (InterruptedException e) {
+                log.warn("Time-out {} sec while checking authorization on {} ",
+                        config().getMetadataStoreOperationTimeoutSeconds(), topicName);
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Time-out while checking authorization");
+            } catch (Exception e) {
+                log.warn("Producer-client  with Role - {} {} failed to get permissions for topic - {}. {}",
+                        authParams.getClientRole(), authParams.getOriginalPrincipal(), topicName, e.getMessage());
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get permissions");
+            }
 
-            boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
-                    .canProduce(topicName, originalPrincipal() == null ? clientAppId() : originalPrincipal(),
-                            clientAuthData());
             if (!isAuthorized) {
-                throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to produce to topic %s"
-                                        + " with clientAppId [%s] and authdata %s", topicName.toString(),
-                        clientAppId(), clientAuthData()));
+                throw new RestException(Status.UNAUTHORIZED, "Unauthorized to produce to topic " + topicName);
             }
         }
     }