blob: 465cc6d52fb96f3ec32ea968fe00ca36674db65a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
private final static String clientRole = "plugbleRole";
protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("superUser");
conf.setSuperUserRoles(superUserRoles);
conf.setBrokerClientAuthenticationPlugin(TestAuthenticationProvider.class.getName());
Set<String> providers = new HashSet<>();
providers.add(TestAuthenticationProvider.class.getName());
conf.setAuthenticationProviders(providers);
conf.setClusterName("use");
super.init();
}
@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
/**
* It verifies plugable authorization service
*
* <pre>
* 1. Client passes correct authorization plugin-name + correct auth role: SUCCESS
* 2. Client passes correct authorization plugin-name + incorrect auth-role: FAIL
* 3. Client passes incorrect authorization plugin-name + correct auth-role: FAIL
* </pre>
*
* @throws Exception
*/
@Test
public void testProducerAndConsumerAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
setup();
ClientConfiguration adminConf = new ClientConfiguration();
Authentication adminAuthentication = new ClientAuthentication("superUser");
adminConf.setAuthentication(adminAuthentication);
admin = spy(new PulsarAdmin(brokerUrl, adminConf));
String lookupUrl;
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
ClientConfiguration clientConfValid = new ClientConfiguration();
Authentication authentication = new ClientAuthentication(clientRole);
clientConfValid.setAuthentication(authentication);
ClientConfiguration clientConfInvalidRole = new ClientConfiguration();
Authentication authenticationInvalidRole = new ClientAuthentication("test-role");
clientConfInvalidRole.setAuthentication(authenticationInvalidRole);
pulsarClient = PulsarClient.create(lookupUrl, clientConfValid);
PulsarClient pulsarClientInvalidRole = PulsarClient.create(lookupUrl, clientConfInvalidRole);
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");
// (1) Valid Producer and consumer creation
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic", "my-subscriber-name");
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic");
consumer.close();
producer.close();
// (2) InValid user auth-role will be rejected by authorization service
try {
consumer = pulsarClientInvalidRole.subscribe("persistent://my-property/use/my-ns/my-topic",
"my-subscriber-name");
Assert.fail("should have failed with authorization error");
} catch (PulsarClientException.AuthorizationException pa) {
// Ok
}
try {
producer = pulsarClientInvalidRole.createProducer("persistent://my-property/use/my-ns/my-topic");
Assert.fail("should have failed with authorization error");
} catch (PulsarClientException.AuthorizationException pa) {
// Ok
}
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
setup();
ClientConfiguration adminConf = new ClientConfiguration();
Authentication adminAuthentication = new ClientAuthentication("superUser");
adminConf.setAuthentication(adminAuthentication);
admin = spy(new PulsarAdmin(brokerUrl, adminConf));
String lookupUrl;
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
ClientConfiguration clientConfValid = new ClientConfiguration();
Authentication authentication = new ClientAuthentication(clientRole);
clientConfValid.setAuthentication(authentication);
pulsarClient = PulsarClient.create(lookupUrl, clientConfValid);
admin.properties().createProperty("prop-prefix",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("prop-prefix/use/ns");
// (1) Valid subscription name will be approved by authorization service
Consumer consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", clientRole + "-sub1");
consumer.close();
// (2) InValid subscription name will be rejected by authorization service
try {
consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", "sub1");
Assert.fail("should have failed with authorization error");
} catch (PulsarClientException.AuthorizationException pa) {
// Ok
}
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testGrantPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
conf.setAuthorizationProvider(TestAuthorizationProviderWithGrantPermission.class.getName());
setup();
AuthorizationService authorizationService = new AuthorizationService(conf, null);
DestinationName destination = DestinationName.get("persistent://prop/cluster/ns/t1");
String role = "test-role";
Assert.assertFalse(authorizationService.canProduce(destination, role, null));
Assert.assertFalse(authorizationService.canConsume(destination, role, null, "sub1"));
authorizationService
.grantPermissionAsync(destination, null, role, "auth-json").get();
Assert.assertTrue(authorizationService.canProduce(destination, role, null));
Assert.assertTrue(authorizationService.canConsume(destination, role, null, "sub1"));
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testAuthData() throws Exception {
log.info("-- Starting {} test --", methodName);
conf.setAuthorizationProvider(TestAuthorizationProviderWithGrantPermission.class.getName());
setup();
AuthorizationService authorizationService = new AuthorizationService(conf, null);
DestinationName destination = DestinationName.get("persistent://prop/cluster/ns/t1");
String role = "test-role";
authorizationService
.grantPermissionAsync(destination, null, role, "auth-json")
.get();
Assert.assertEquals(TestAuthorizationProviderWithGrantPermission.authDataJson, "auth-json");
Assert.assertTrue(
authorizationService.canProduce(destination, role, new AuthenticationDataCommand("prod-auth")));
Assert.assertEquals(TestAuthorizationProviderWithGrantPermission.authenticationData.getCommandData(),
"prod-auth");
Assert.assertTrue(authorizationService.canConsume(destination, role, new AuthenticationDataCommand("cons-auth"),
"sub1"));
Assert.assertEquals(TestAuthorizationProviderWithGrantPermission.authenticationData.getCommandData(),
"cons-auth");
log.info("-- Exiting {} test --", methodName);
}
public static class ClientAuthentication implements Authentication {
String user;
public ClientAuthentication(String user) {
this.user = user;
}
@Override
public void close() throws IOException {
// No-op
}
@Override
public String getAuthMethodName() {
return "test";
}
@Override
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
AuthenticationDataProvider provider = new AuthenticationDataProvider() {
public boolean hasDataForHttp() {
return true;
}
@SuppressWarnings("unchecked")
public Set<Map.Entry<String, String>> getHttpHeaders() {
return Sets.newHashSet(Maps.immutableEntry("user", user));
}
public boolean hasDataFromCommand() {
return true;
}
public String getCommandData() {
return user;
}
};
return provider;
}
@Override
public void configure(Map<String, String> authParams) {
// No-op
}
@Override
public void start() throws PulsarClientException {
// No-op
}
}
public static class TestAuthenticationProvider implements AuthenticationProvider {
@Override
public void close() throws IOException {
// no-op
}
@Override
public void initialize(ServiceConfiguration config) throws IOException {
// No-op
}
@Override
public String getAuthMethodName() {
return "test";
}
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
return authData.getCommandData() != null ? authData.getCommandData() : authData.getHttpHeader("user");
}
}
public static class TestAuthorizationProvider implements AuthorizationProvider {
@Override
public void close() throws IOException {
// No-op
}
@Override
public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
// No-op
}
@Override
public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(clientRole.equals(role));
}
@Override
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData, String subscription) {
return CompletableFuture.completedFuture(clientRole.equals(role));
}
@Override
public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(clientRole.equals(role));
}
@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
String role, String authenticationData) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions,
String role, String authenticationData) {
return CompletableFuture.completedFuture(null);
}
}
/**
* This provider always fails authorization on consumer and passes on producer
*
*/
public static class TestAuthorizationProvider2 extends TestAuthorizationProvider {
@Override
public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(true);
}
@Override
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData, String subscription) {
return CompletableFuture.completedFuture(false);
}
@Override
public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(true);
}
}
public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
@Override
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData, String subscription) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
if (isNotBlank(subscription)) {
if (!subscription.startsWith(role)) {
future.completeExceptionally(new PulsarServerException(
"The subscription name needs to be prefixed by the authentication role"));
}
}
future.complete(clientRole.equals(role));
return future;
}
}
public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
private Set<String> grantRoles = Sets.newHashSet();
static AuthenticationDataSource authenticationData;
static String authDataJson;
@Override
public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData) {
this.authenticationData = authenticationData;
return CompletableFuture.completedFuture(grantRoles.contains(role));
}
@Override
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData, String subscription) {
this.authenticationData = authenticationData;
return CompletableFuture.completedFuture(grantRoles.contains(role));
}
@Override
public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData) {
this.authenticationData = authenticationData;
return CompletableFuture.completedFuture(grantRoles.contains(role));
}
@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
String role, String authData) {
this.authDataJson = authData;
grantRoles.add(role);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions,
String role, String authData) {
this.authDataJson = authData;
grantRoles.add(role);
return CompletableFuture.completedFuture(null);
}
}
}