[improve][broker] Authorize originalPrincipal when provided (#19830)
(cherry picked from commit 6bc3530628344570cbd9171485e0478c6f01eab4)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 6a665d0..ccb7beb 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -341,14 +341,10 @@
/**
* Validates that the authenticatedPrincipal and the originalPrincipal are a valid combination.
- * Valid combinations fulfill one of the following two rules:
+ * Valid combinations fulfills the following rule:
* <p>
- * 1. The authenticatedPrincipal is in {@link ServiceConfiguration#getProxyRoles()}, if, and only if,
+ * The authenticatedPrincipal is in {@link ServiceConfiguration#getProxyRoles()}, if, and only if,
* the originalPrincipal is set to a role that is not also in {@link ServiceConfiguration#getProxyRoles()}.
- * <p>
- * 2. The authenticatedPrincipal and the originalPrincipal are the same, but are not a proxyRole, when
- * allowNonProxyPrincipalsToBeEqual is true.
- *
* @return true when roles are a valid combination and false when roles are an invalid combination
*/
public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
@@ -364,7 +360,9 @@
}
} else if (StringUtils.isNotBlank(originalPrincipal)
&& !(allowNonProxyPrincipalsToBeEqual && originalPrincipal.equals(authenticatedPrincipal))) {
- errorMsg = "cannot specify originalPrincipal when connecting without valid proxy role.";
+ log.warn("[{}] Non-proxy role [{}] passed originalPrincipal [{}]. This behavior will not "
+ + "be allowed in a future release. A proxy's role must be in the broker's proxyRoles "
+ + "configuration.", remoteAddress, authenticatedPrincipal, originalPrincipal);
}
if (errorMsg != null) {
log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress,
@@ -415,7 +413,7 @@
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync(
tenantName, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTenantOperationAsync(
@@ -482,7 +480,7 @@
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync(
namespaceName, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespaceOperationAsync(
@@ -547,7 +545,7 @@
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespacePolicyOperationAsync(
@@ -615,7 +613,7 @@
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicPolicyOperationAsync(
topicName, policy, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTopicPolicyOperationAsync(
@@ -708,7 +706,7 @@
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicOperationAsync(
topicName, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTopicOperationAsync(
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
new file mode 100644
index 0000000..54747f9
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.HashSet;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class AuthorizationServiceTest {
+
+ AuthorizationService authorizationService;
+
+ @BeforeClass
+ void beforeClass() throws PulsarServerException {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setAuthorizationEnabled(true);
+ // Consider both of these proxy roles to make testing more comprehensive
+ HashSet<String> proxyRoles = new HashSet<>();
+ proxyRoles.add("pass.proxy");
+ proxyRoles.add("fail.proxy");
+ conf.setProxyRoles(proxyRoles);
+ conf.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
+ authorizationService = new AuthorizationService(conf, null);
+ }
+
+ /**
+ * See {@link MockAuthorizationProvider} for the implementation of the mock authorization provider.
+ */
+ @DataProvider(name = "roles")
+ public Object[][] encryptionProvider() {
+ return new Object[][]{
+ // Schema: role, originalRole, whether authorization should pass
+
+ // Client conditions where original role isn't passed or is blank
+ {"pass.client", null, Boolean.TRUE},
+ {"pass.client", " ", Boolean.TRUE},
+ {"fail.client", null, Boolean.FALSE},
+ {"fail.client", " ", Boolean.FALSE},
+
+ // Proxy conditions where original role isn't passed or is blank
+ {"pass.proxy", null, Boolean.FALSE},
+ {"pass.proxy", " ", Boolean.FALSE},
+ {"fail.proxy", null, Boolean.FALSE},
+ {"fail.proxy", " ", Boolean.FALSE},
+
+ // Normal proxy and client conditions
+ {"pass.proxy", "pass.client", Boolean.TRUE},
+ {"pass.proxy", "fail.client", Boolean.FALSE},
+ {"fail.proxy", "pass.client", Boolean.FALSE},
+ {"fail.proxy", "fail.client", Boolean.FALSE},
+
+ // Not proxy with original principal
+ {"pass.not-proxy", "pass.client", Boolean.TRUE},
+ {"pass.not-proxy", "fail.client", Boolean.FALSE},
+ {"fail.not-proxy", "pass.client", Boolean.FALSE},
+ {"fail.not-proxy", "fail.client", Boolean.FALSE},
+
+ // Covers an unlikely scenario, but valid in the context of this test
+ {null, "pass.proxy", Boolean.FALSE},
+ };
+ }
+
+ private void checkResult(boolean expected, boolean actual) {
+ if (expected) {
+ assertTrue(actual);
+ } else {
+ assertFalse(actual);
+ }
+ }
+
+ @Test(dataProvider = "roles")
+ public void testAllowTenantOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowTenantOperationAsync("tenant",
+ TenantOperation.DELETE_NAMESPACE, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testNamespaceOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"),
+ NamespaceOperation.PACKAGES, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testTopicOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"),
+ TopicOperation.PRODUCE, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testNamespacePolicyOperationAsync(String role, String originalRole, boolean shouldPass)
+ throws Exception {
+ boolean isAuthorized = authorizationService.allowNamespacePolicyOperationAsync(
+ NamespaceName.get("public/default"), PolicyName.ALL, PolicyOperation.READ, originalRole, role, null)
+ .get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testTopicPolicyOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowTopicPolicyOperationAsync(TopicName.get("topic"),
+ PolicyName.ALL, PolicyOperation.READ, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
new file mode 100644
index 0000000..beb0b87
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Mock implementation of the authorization provider interface used for testing.
+ * A role is authorized if it starts with "pass".
+ */
+public class MockAuthorizationProvider implements AuthorizationProvider {
+
+ private CompletableFuture<Boolean> shouldPass(String role) {
+ return CompletableFuture.completedFuture(role != null && role.startsWith("pass"));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation, String role, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic, String role, PolicyName policy, PolicyOperation operation, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+
+ @Override
+ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+ return null;
+ }
+
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
+ String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+ Set<String> roles, String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+ String role, String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
+ String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index c55b977..6fb8607 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -191,7 +191,8 @@
String originalPrincipal = originalPrincipal();
validateOriginalPrincipal(appId, originalPrincipal);
- if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
+ if (pulsar.getConfiguration().getProxyRoles().contains(appId)
+ || StringUtils.isNotBlank(originalPrincipal())) {
CompletableFuture<Boolean> proxyAuthorizedFuture;
CompletableFuture<Boolean> originalPrincipalAuthorizedFuture;
@@ -268,7 +269,8 @@
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
validateOriginalPrincipal(clientAppId, originalPrincipal);
- if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)
+ || StringUtils.isNotBlank(originalPrincipal)) {
AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 046ada2..832b0d4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -251,7 +251,8 @@
// Edge cases that differ because binary protocol and http protocol have different expectations
assertTrue(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, true));
- assertFalse(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, false));
+ // This assert flips to assertFalse in the 3.0 release.
+ assertTrue(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, false));
// Only likely in cases when authentication is disabled, but we still define these to be valid.
assertTrue(auth.isValidOriginalPrincipal(null, null, (SocketAddress) null, false));
@@ -265,9 +266,10 @@
// OriginalPrincipal cannot be proxy role
assertFalse(auth.isValidOriginalPrincipal("proxy", "proxy", (SocketAddress) null, false));
- assertFalse(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null, false));
- assertFalse(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null, false));
- assertFalse(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null, false));
+ // The next 3 asserts flip to assertFalse in the 3.0 release.
+ assertTrue(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null, false));
+ assertTrue(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null, false));
+ assertTrue(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null, false));
// Must gracefully handle a missing AuthenticationDataSource
assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (AuthenticationDataSource) null));