Enable authentication and authorization for some admin APIs (#2046)
### Motivation
Currently, it seems to that users who are not admin can send POST requests to the REST APIs for configuring retention policy and persistence policies.
```
POST /admin/namespaces/{tenant}/{namespace}/retention
POST /admin/persistent/{tenant}/{namespace}/persistence
```
I think that only administrators should be able to change these settings.
### Modifications
Fixed these APIs to authenticate and authorize users by calling the `validateAdminAccessForTenant()` method.
### Result
Only the administrators of the tenant will be able to change these settings.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b8a4c53..553f53a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -767,6 +767,7 @@
}
protected void internalSetRetention(RetentionPolicies retention) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
try {
@@ -804,6 +805,7 @@
}
protected void internalSetPersistence(PersistencePolicies persistence) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
validatePersistencePolicies(persistence);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 305c887..f24aad5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -64,6 +64,7 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -90,6 +91,7 @@
private List<NamespaceName> testLocalNamespaces;
private List<NamespaceName> testGlobalNamespaces;
private final String testTenant = "my-tenant";
+ private final String testOtherTenant = "other-tenant";
private final String testLocalCluster = "use";
private final String testOtherCluster = "usc";
@@ -110,6 +112,7 @@
testLocalNamespaces.add(NamespaceName.get(this.testTenant, this.testLocalCluster, "test-namespace-1"));
testLocalNamespaces.add(NamespaceName.get(this.testTenant, this.testLocalCluster, "test-namespace-2"));
testLocalNamespaces.add(NamespaceName.get(this.testTenant, this.testOtherCluster, "test-other-namespace-1"));
+ testLocalNamespaces.add(NamespaceName.get(this.testOtherTenant, this.testLocalCluster, "test-namespace-1"));
testGlobalNamespaces.add(NamespaceName.get(this.testTenant, "global", "test-global-ns1"));
@@ -133,8 +136,8 @@
doReturn(false).when(namespaces).isRequestHttps();
doReturn("test").when(namespaces).clientAppId();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
- doNothing().when(namespaces).validateAdminAccessForTenant("my-tenant");
- doNothing().when(namespaces).validateAdminAccessForTenant("other-tenant");
+ doNothing().when(namespaces).validateAdminAccessForTenant(this.testTenant);
+ doNothing().when(namespaces).validateAdminAccessForTenant("non-existing-tenant");
doNothing().when(namespaces).validateAdminAccessForTenant("new-property");
admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
@@ -142,11 +145,16 @@
admin.clusters().createCluster("usc", new ClusterData("http://broker-usc.com:" + BROKER_WEBSERVICE_PORT));
admin.tenants().createTenant(this.testTenant,
new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use", "usc", "usw")));
+ admin.tenants().createTenant(this.testOtherTenant,
+ new TenantInfo(Sets.newHashSet("role3", "role4"), Sets.newHashSet("use", "usc", "usw")));
- createTestNamespaces(this.testTenant, this.testLocalNamespaces, new BundlesData());
+ createTestNamespaces(this.testLocalNamespaces, new BundlesData());
createGlobalTestNamespaces(this.testTenant, this.testGlobalNamespaces.get(0).getLocalName(),
new BundlesData());
+ doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+ .validateAdminAccessForTenant(this.testOtherTenant);
+
nsSvc = pulsar.getNamespaceService();
}
@@ -159,34 +167,34 @@
@Test
public void testCreateNamespaces() throws Exception {
try {
- namespaces.createNamespace("my-tenant", "other-colo", "my-namespace", new BundlesData());
+ namespaces.createNamespace(this.testTenant, "other-colo", "my-namespace", new BundlesData());
fail("should have failed");
} catch (RestException e) {
// Ok, cluster doesn't exist
}
List<NamespaceName> nsnames = Lists.newArrayList();
- nsnames.add(NamespaceName.get("my-tenant", "use", "create-namespace-1"));
- nsnames.add(NamespaceName.get("my-tenant", "use", "create-namespace-2"));
- nsnames.add(NamespaceName.get("my-tenant", "usc", "create-other-namespace-1"));
- createTestNamespaces("my-tenant", nsnames, new BundlesData());
+ nsnames.add(NamespaceName.get(this.testTenant, "use", "create-namespace-1"));
+ nsnames.add(NamespaceName.get(this.testTenant, "use", "create-namespace-2"));
+ nsnames.add(NamespaceName.get(this.testTenant, "usc", "create-other-namespace-1"));
+ createTestNamespaces(nsnames, new BundlesData());
try {
- namespaces.createNamespace("my-tenant", "use", "create-namespace-1", new BundlesData());
+ namespaces.createNamespace(this.testTenant, "use", "create-namespace-1", new BundlesData());
fail("should have failed");
} catch (RestException e) {
// Ok, namespace already exists
}
try {
- namespaces.createNamespace("other-tenant", "use", "create-namespace-1", new BundlesData());
+ namespaces.createNamespace("non-existing-tenant", "use", "create-namespace-1", new BundlesData());
fail("should have failed");
} catch (RestException e) {
- // Ok, property doesn't exist
+ // Ok, tenant doesn't exist
}
try {
- namespaces.createNamespace("my-tenant", "use", "create-namespace-#", new BundlesData());
+ namespaces.createNamespace(this.testTenant, "use", "create-namespace-#", new BundlesData());
fail("should have failed");
} catch (RestException e) {
// Ok, invalid namespace name
@@ -195,7 +203,7 @@
mockZookKeeper.failNow(Code.SESSIONEXPIRED);
try {
- namespaces.createNamespace("my-tenant", "use", "my-namespace-3", new BundlesData());
+ namespaces.createNamespace(this.testTenant, "use", "my-namespace-3", new BundlesData());
fail("should have failed");
} catch (RestException e) {
// Ok
@@ -215,7 +223,7 @@
assertEquals(namespaces.getTenantNamespaces(this.testTenant), expectedList);
try {
- namespaces.getTenantNamespaces("other-tenant");
+ namespaces.getTenantNamespaces("non-existing-tenant");
fail("should have failed");
} catch (RestException e) {
// Ok, does not exist
@@ -971,8 +979,7 @@
namespaces.createNamespace(property, "global", namespace, bundle);
}
- private void createTestNamespaces(String property, List<NamespaceName> nsnames, BundlesData bundle)
- throws Exception {
+ private void createTestNamespaces(List<NamespaceName> nsnames, BundlesData bundle) throws Exception {
for (NamespaceName nsName : nsnames) {
namespaces.createNamespace(nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), bundle);
}
@@ -1047,6 +1054,40 @@
}
@Test
+ public void testRetentionUnauthorized() throws Exception {
+ try {
+ NamespaceName testNs = this.testLocalNamespaces.get(3);
+ RetentionPolicies retention = new RetentionPolicies(10, 10);
+ namespaces.setRetention(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), retention);
+ fail("Should fail");
+ } catch (RestException e) {
+ assertEquals(e.getResponse().getStatus(), Status.UNAUTHORIZED.getStatusCode());
+ }
+ }
+
+ @Test
+ public void testPersistence() throws Exception {
+ NamespaceName testNs = this.testLocalNamespaces.get(0);
+ PersistencePolicies persistence1 = new PersistencePolicies(3, 2, 1, 0.0);
+ namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence1);
+ PersistencePolicies persistence2 = namespaces.getPersistence(testNs.getTenant(), testNs.getCluster(),
+ testNs.getLocalName());
+ assertEquals(persistence2, persistence1);
+ }
+
+ @Test
+ public void testPersistenceUnauthorized() throws Exception {
+ try {
+ NamespaceName testNs = this.testLocalNamespaces.get(3);
+ PersistencePolicies persistence = new PersistencePolicies(3, 2, 1, 0.0);
+ namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), persistence);
+ fail("Should fail");
+ } catch (RestException e) {
+ assertEquals(e.getResponse().getStatus(), Status.UNAUTHORIZED.getStatusCode());
+ }
+ }
+
+ @Test
public void testValidateTopicOwnership() throws Exception {
try {
URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());