[improve][broker] Authorize originalPrincipal when provided (#19830)

(cherry picked from commit 6bc3530628344570cbd9171485e0478c6f01eab4)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 6a665d0..ccb7beb 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -341,14 +341,10 @@
 
     /**
      * Validates that the authenticatedPrincipal and the originalPrincipal are a valid combination.
-     * Valid combinations fulfill one of the following two rules:
+     * Valid combinations fulfills the following rule:
      * <p>
-     * 1. The authenticatedPrincipal is in {@link ServiceConfiguration#getProxyRoles()}, if, and only if,
+     * The authenticatedPrincipal is in {@link ServiceConfiguration#getProxyRoles()}, if, and only if,
      * the originalPrincipal is set to a role that is not also in {@link ServiceConfiguration#getProxyRoles()}.
-     * <p>
-     * 2. The authenticatedPrincipal and the originalPrincipal are the same, but are not a proxyRole, when
-     * allowNonProxyPrincipalsToBeEqual is true.
-     *
      * @return true when roles are a valid combination and false when roles are an invalid combination
      */
     public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
@@ -364,7 +360,9 @@
             }
         } else if (StringUtils.isNotBlank(originalPrincipal)
                 && !(allowNonProxyPrincipalsToBeEqual && originalPrincipal.equals(authenticatedPrincipal))) {
-            errorMsg = "cannot specify originalPrincipal when connecting without valid proxy role.";
+                log.warn("[{}] Non-proxy role [{}] passed originalPrincipal [{}]. This behavior will not "
+                        + "be allowed in a future release. A proxy's role must be in the broker's proxyRoles "
+                        + "configuration.", remoteAddress, authenticatedPrincipal, originalPrincipal);
         }
         if (errorMsg != null) {
             log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress,
@@ -415,7 +413,7 @@
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync(
                     tenantName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTenantOperationAsync(
@@ -482,7 +480,7 @@
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync(
                     namespaceName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespaceOperationAsync(
@@ -547,7 +545,7 @@
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
                     namespaceName, policy, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespacePolicyOperationAsync(
@@ -615,7 +613,7 @@
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicPolicyOperationAsync(
                     topicName, policy, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTopicPolicyOperationAsync(
@@ -708,7 +706,7 @@
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicOperationAsync(
                     topicName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTopicOperationAsync(
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
new file mode 100644
index 0000000..54747f9
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.HashSet;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class AuthorizationServiceTest {
+
+    AuthorizationService authorizationService;
+
+    @BeforeClass
+    void beforeClass() throws PulsarServerException {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setAuthorizationEnabled(true);
+        // Consider both of these proxy roles to make testing more comprehensive
+        HashSet<String> proxyRoles = new HashSet<>();
+        proxyRoles.add("pass.proxy");
+        proxyRoles.add("fail.proxy");
+        conf.setProxyRoles(proxyRoles);
+        conf.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
+        authorizationService = new AuthorizationService(conf, null);
+    }
+
+    /**
+     * See {@link MockAuthorizationProvider} for the implementation of the mock authorization provider.
+     */
+    @DataProvider(name = "roles")
+    public Object[][] encryptionProvider() {
+        return new Object[][]{
+                // Schema: role, originalRole, whether authorization should pass
+
+                // Client conditions where original role isn't passed or is blank
+                {"pass.client", null, Boolean.TRUE},
+                {"pass.client", " ", Boolean.TRUE},
+                {"fail.client", null, Boolean.FALSE},
+                {"fail.client", " ", Boolean.FALSE},
+
+                // Proxy conditions where original role isn't passed or is blank
+                {"pass.proxy", null, Boolean.FALSE},
+                {"pass.proxy", " ", Boolean.FALSE},
+                {"fail.proxy", null, Boolean.FALSE},
+                {"fail.proxy", " ", Boolean.FALSE},
+
+                // Normal proxy and client conditions
+                {"pass.proxy", "pass.client", Boolean.TRUE},
+                {"pass.proxy", "fail.client", Boolean.FALSE},
+                {"fail.proxy", "pass.client", Boolean.FALSE},
+                {"fail.proxy", "fail.client", Boolean.FALSE},
+
+                // Not proxy with original principal
+                {"pass.not-proxy", "pass.client", Boolean.TRUE},
+                {"pass.not-proxy", "fail.client", Boolean.FALSE},
+                {"fail.not-proxy", "pass.client", Boolean.FALSE},
+                {"fail.not-proxy", "fail.client", Boolean.FALSE},
+
+                // Covers an unlikely scenario, but valid in the context of this test
+                {null, "pass.proxy", Boolean.FALSE},
+        };
+    }
+
+    private void checkResult(boolean expected, boolean actual) {
+        if (expected) {
+            assertTrue(actual);
+        } else {
+            assertFalse(actual);
+        }
+    }
+
+    @Test(dataProvider = "roles")
+    public void testAllowTenantOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+        boolean isAuthorized = authorizationService.allowTenantOperationAsync("tenant",
+                TenantOperation.DELETE_NAMESPACE, originalRole, role, null).get();
+        checkResult(shouldPass, isAuthorized);
+    }
+
+    @Test(dataProvider = "roles")
+    public void testNamespaceOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+        boolean isAuthorized = authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"),
+                NamespaceOperation.PACKAGES, originalRole, role, null).get();
+        checkResult(shouldPass, isAuthorized);
+    }
+
+    @Test(dataProvider = "roles")
+    public void testTopicOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+        boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"),
+                TopicOperation.PRODUCE, originalRole, role, null).get();
+        checkResult(shouldPass, isAuthorized);
+    }
+
+    @Test(dataProvider = "roles")
+    public void testNamespacePolicyOperationAsync(String role, String originalRole, boolean shouldPass)
+            throws Exception {
+        boolean isAuthorized = authorizationService.allowNamespacePolicyOperationAsync(
+                NamespaceName.get("public/default"), PolicyName.ALL, PolicyOperation.READ, originalRole, role, null)
+                .get();
+        checkResult(shouldPass, isAuthorized);
+    }
+
+    @Test(dataProvider = "roles")
+    public void testTopicPolicyOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+        boolean isAuthorized = authorizationService.allowTopicPolicyOperationAsync(TopicName.get("topic"),
+                PolicyName.ALL, PolicyOperation.READ, originalRole, role, null).get();
+        checkResult(shouldPass, isAuthorized);
+    }
+}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
new file mode 100644
index 0000000..beb0b87
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Mock implementation of the authorization provider interface used for testing.
+ * A role is authorized if it starts with "pass".
+ */
+public class MockAuthorizationProvider implements AuthorizationProvider {
+
+    private CompletableFuture<Boolean> shouldPass(String role) {
+        return CompletableFuture.completedFuture(role != null && role.startsWith("pass"));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+                                                      AuthenticationDataSource authenticationData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+                                                      AuthenticationDataSource authenticationData, String subscription) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+                                                     AuthenticationDataSource authenticationData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+                                                            AuthenticationDataSource authenticationData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
+                                                          AuthenticationDataSource authenticationData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation, String role, AuthenticationDataSource authData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic, String role, PolicyName policy, PolicyOperation operation, AuthenticationDataSource authData) {
+        return shouldPass(role);
+    }
+
+
+    @Override
+    public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+        return null;
+    }
+
+
+    @Override
+    public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
+                                                        String authDataJson) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+                                                                    Set<String> roles, String authDataJson) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+                                                                     String role, String authDataJson) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
+                                                        String authDataJson) {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index c55b977..6fb8607 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -191,7 +191,8 @@
             String originalPrincipal = originalPrincipal();
             validateOriginalPrincipal(appId, originalPrincipal);
 
-            if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
+            if (pulsar.getConfiguration().getProxyRoles().contains(appId)
+                    || StringUtils.isNotBlank(originalPrincipal())) {
 
                 CompletableFuture<Boolean> proxyAuthorizedFuture;
                 CompletableFuture<Boolean> originalPrincipalAuthorizedFuture;
@@ -268,7 +269,8 @@
                             throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
                         }
                         validateOriginalPrincipal(clientAppId, originalPrincipal);
-                        if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                        if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)
+                                || StringUtils.isNotBlank(originalPrincipal)) {
                             AuthorizationService authorizationService =
                                     pulsar.getBrokerService().getAuthorizationService();
                             return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 046ada2..832b0d4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -251,7 +251,8 @@
 
         // Edge cases that differ because binary protocol and http protocol have different expectations
         assertTrue(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, true));
-        assertFalse(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, false));
+        // This assert flips to assertFalse in the 3.0 release.
+        assertTrue(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, false));
 
         // Only likely in cases when authentication is disabled, but we still define these to be valid.
         assertTrue(auth.isValidOriginalPrincipal(null, null, (SocketAddress) null, false));
@@ -265,9 +266,10 @@
 
         // OriginalPrincipal cannot be proxy role
         assertFalse(auth.isValidOriginalPrincipal("proxy", "proxy", (SocketAddress) null, false));
-        assertFalse(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null, false));
-        assertFalse(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null, false));
-        assertFalse(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null, false));
+        // The next 3 asserts flip to assertFalse in the 3.0 release.
+        assertTrue(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null, false));
+        assertTrue(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null, false));
+        assertTrue(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null, false));
 
         // Must gracefully handle a missing AuthenticationDataSource
         assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (AuthenticationDataSource) null));