blob: bcb8e3233a093f683befb2904fb8f53b59ace29d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import static org.awaitility.Awaitility.await;
import io.jsonwebtoken.Jwts;
import java.util.Set;
import java.util.UUID;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {
private PulsarAdmin superUserAdmin;
private PulsarAdmin tenantManagerAdmin;
private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString();
private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
@SneakyThrows
@BeforeClass
public void before() {
configureTokenAuthentication();
configureDefaultAuthorization();
start();
this.superUserAdmin =PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
.build();
final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public");
tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
superUserAdmin.tenants().updateTenant("public", tenantInfo);
this.tenantManagerAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
.build();
}
@SneakyThrows
@AfterClass
public void after() {
close();
}
@SneakyThrows
@Test
public void testRetention() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
final RetentionPolicies definedRetentionPolicy = new RetentionPolicies(1, 1);
// test superuser
superUserAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
// because the topic policies is eventual consistency, we should wait here
await().untilAsserted(() -> {
final RetentionPolicies receivedRetentionPolicy = superUserAdmin.topicPolicies().getRetention(topic);
Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
});
superUserAdmin.topicPolicies().removeRetention(topic);
await().untilAsserted(() -> {
final RetentionPolicies retention = superUserAdmin.topicPolicies().getRetention(topic);
Assert.assertNull(retention);
});
// test tenant manager
tenantManagerAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
await().untilAsserted(() -> {
final RetentionPolicies receivedRetentionPolicy = tenantManagerAdmin.topicPolicies().getRetention(topic);
Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
});
tenantManagerAdmin.topicPolicies().removeRetention(topic);
await().untilAsserted(() -> {
final RetentionPolicies retention = tenantManagerAdmin.topicPolicies().getRetention(topic);
Assert.assertNull(retention);
});
// test nobody
try {
subAdmin.topicPolicies().getRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().removeRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
// test sub user with permissions
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
subject, Set.of(action));
try {
subAdmin.topicPolicies().getRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().removeRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
}
}
@SneakyThrows
@Test
public void testOffloadPolicy() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
// mocked data
final OffloadPoliciesImpl definedOffloadPolicies = new OffloadPoliciesImpl();
definedOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(100L);
definedOffloadPolicies.setManagedLedgerOffloadThresholdInSeconds(100L);
definedOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(200L);
definedOffloadPolicies.setManagedLedgerOffloadDriver("s3");
definedOffloadPolicies.setManagedLedgerOffloadBucket("buck");
// test superuser
superUserAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
// because the topic policies is eventual consistency, we should wait here
await().untilAsserted(() -> {
final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic);
Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
});
superUserAdmin.topicPolicies().removeOffloadPolicies(topic);
await().untilAsserted(() -> {
final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic);
Assert.assertNull(offloadPolicy);
});
// test tenant manager
tenantManagerAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
await().untilAsserted(() -> {
final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
});
tenantManagerAdmin.topicPolicies().removeOffloadPolicies(topic);
await().untilAsserted(() -> {
final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
Assert.assertNull(offloadPolicy);
});
// test nobody
try {
subAdmin.topicPolicies().getOffloadPolicies(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().removeOffloadPolicies(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
// test sub user with permissions
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
subject, Set.of(action));
try {
subAdmin.topicPolicies().getOffloadPolicies(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().removeOffloadPolicies(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
}
}
@SneakyThrows
@Test
public void testMaxUnackedMessagesOnConsumer() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
// mocked data
int definedUnackedMessagesOnConsumer = 100;
// test superuser
superUserAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
// because the topic policies is eventual consistency, we should wait here
await().untilAsserted(() -> {
final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
.getMaxUnackedMessagesOnConsumer(topic);
Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
});
superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
await().untilAsserted(() -> {
final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
Assert.assertNull(unackedMessagesOnConsumer);
});
// test tenant manager
tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
await().untilAsserted(() -> {
final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
});
tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
await().untilAsserted(() -> {
final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
Assert.assertNull(unackedMessagesOnConsumer);
});
// test nobody
try {
subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
// test sub user with permissions
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
subject, Set.of(action));
try {
subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
}
}
@SneakyThrows
@Test
public void testMaxUnackedMessagesOnSubscription() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
// mocked data
int definedUnackedMessagesOnConsumer = 100;
// test superuser
superUserAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
// because the topic policies is eventual consistency, we should wait here
await().untilAsserted(() -> {
final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
.getMaxUnackedMessagesOnSubscription(topic);
Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
});
superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
await().untilAsserted(() -> {
final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
.getMaxUnackedMessagesOnSubscription(topic);
Assert.assertNull(unackedMessagesOnConsumer);
});
// test tenant manager
tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
await().untilAsserted(() -> {
final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
});
tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
await().untilAsserted(() -> {
final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies()
.getMaxUnackedMessagesOnSubscription(topic);
Assert.assertNull(unackedMessagesOnConsumer);
});
// test nobody
try {
subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
// test sub user with permissions
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
subject, Set.of(action));
try {
subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
try {
subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
}
}
}