blob: 49ee9892d7de0d192eb753d1edba0a7808ba3ff8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
public class PatternTopicsConsumerImplAuthTest extends ProducerConsumerBase {
private static final long testTimeout = 90000; // 1.5 min
private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImplAuthTest.class);
private static final String clientRole = "pluggableRole";
private static final String superUserRole = "superUser";
private static final Set<String> clientAuthProviderSupportedRoles = Sets.newHashSet(clientRole);
@Override
@BeforeMethod
public void setup() throws Exception {
// set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
isTcpLookup = true;
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add(superUserRole);
conf.setSuperUserRoles(superUserRoles);
Set<String> providers = new HashSet<>();
providers.add(TestAuthenticationProvider.class.getName());
conf.setAuthenticationProviders(providers);
conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
conf.setClusterName("test");
super.internalSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
private PulsarAdmin buildAdminClient() throws Exception {
Authentication adminAuthentication = new ClientAuthentication(superUserRole);
return PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
.authentication(adminAuthentication)
.build();
}
// verify binary proto with correct auth check
// if with invalid role, consumer for pattern topic subscription will fail
@Test(timeOut = testTimeout)
public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
String key = "BinaryProtoToGetTopics";
String subscriptionName = "my-ex-subscription-" + key;
String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key;
Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*");
@Cleanup
PulsarAdmin admin = buildAdminClient();
String lookupUrl = pulsar.getBrokerServiceUrl();
Authentication authentication = new ClientAuthentication(clientRole);
Authentication authenticationInvalidRole = new ClientAuthentication("test-role");
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
PulsarClient pulsarClientInvalidRole = PulsarClient.builder().serviceUrl(lookupUrl)
.operationTimeout(1000, TimeUnit.MILLISECONDS)
.authentication(authenticationInvalidRole).build();
// 1. create partition and grant permissions
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
admin.namespaces().grantPermissionOnNamespace("my-property/my-ns", clientRole,
EnumSet.allOf(AuthAction.class));
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);
// 2. create producers and consumer
String messagePredicate = "my-message-" + key + "-";
int totalMessages = 30;
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2)
.enableBatching(false)
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
.create();
Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3)
.enableBatching(false)
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
.create();
Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topicName4)
.enableBatching(false)
.create();
Consumer<byte[]> consumer;
// Invalid user auth-role will be rejected by authorization service
try {
consumer = pulsarClientInvalidRole.newConsumer()
.topicsPattern(pattern)
.patternAutoDiscoveryPeriod(2)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(4)
.subscribe();
Assert.fail("should have failed with authorization error");
} catch (PulsarClientException.AuthorizationException pa) {
// Ok
}
// create pattern topics consumer with correct role client
consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
.patternAutoDiscoveryPeriod(2)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(4)
.subscribe();
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
// 4. verify consumer
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
IntStream.range(0, topics.size()).forEach(index ->
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
// 5. produce data
for (int i = 0; i < totalMessages / 3; i++) {
producer1.send((messagePredicate + "producer1-" + i).getBytes());
producer2.send((messagePredicate + "producer2-" + i).getBytes());
producer3.send((messagePredicate + "producer3-" + i).getBytes());
producer4.send((messagePredicate + "producer4-" + i).getBytes());
}
// 6. should receive all the message
int messageSet = 0;
Message<byte[]> message = consumer.receive();
do {
assertTrue(message instanceof TopicMessageImpl);
messageSet ++;
consumer.acknowledge(message);
log.debug("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet, totalMessages);
consumer.unsubscribe();
consumer.close();
producer1.close();
producer2.close();
producer3.close();
producer4.close();
}
public static class TestAuthorizationProvider implements AuthorizationProvider {
public ServiceConfiguration conf;
@Override
public void close() throws IOException {
// No-op
}
@Override
public void initialize(ServiceConfiguration conf, PulsarResources resources) throws IOException {
this.conf = conf;
// No-op
}
@Override
public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData, String subscription) {
return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return null;
}
@Override
public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return null;
}
@Override
public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return null;
}
@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
String role, String authenticationData) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> grantPermissionAsync(TopicName topicname, Set<AuthAction> actions, String role,
String authenticationData) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace,
String subscriptionName, Set<String> roles, String authDataJson) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace,
String subscriptionName, String role, String authDataJson) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(true);
}
@Override
public CompletableFuture<Boolean> allowTenantOperationAsync(
String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
return CompletableFuture.completedFuture(true);
}
@Override
public Boolean allowTenantOperation(
String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
return true;
}
@Override
public CompletableFuture<Boolean> allowNamespaceOperationAsync(
NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
CompletableFuture<Boolean> isAuthorizedFuture;
if (role.equals(superUserRole) || role.equals(clientRole)) {
isAuthorizedFuture = CompletableFuture.completedFuture(true);
} else {
isAuthorizedFuture = CompletableFuture.completedFuture(false);
}
return isAuthorizedFuture;
}
@Override
public Boolean allowNamespaceOperation(
NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
try {
return allowNamespaceOperationAsync(namespaceName, role, operation, authData).get();
} catch (InterruptedException | ExecutionException e) {
throw new RestException(e);
}
}
@Override
public CompletableFuture<Boolean> allowTopicOperationAsync(
TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
CompletableFuture<Boolean> isAuthorizedFuture;
if (role.equals(superUserRole) || role.equals(clientRole)) {
isAuthorizedFuture = CompletableFuture.completedFuture(true);
} else {
isAuthorizedFuture = CompletableFuture.completedFuture(false);
}
return isAuthorizedFuture;
}
@Override
public Boolean allowTopicOperation(
TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) {
try {
return allowTopicOperationAsync(topicName, role, operation, authData).get();
} catch (InterruptedException | ExecutionException e) {
throw new RestException(e);
}
}
@Override
public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic, String role,
PolicyName policy, PolicyOperation operation,
AuthenticationDataSource authData) {
CompletableFuture<Boolean> isAuthorizedFuture;
if (role.equals(superUserRole) || role.equals(clientRole)) {
isAuthorizedFuture = CompletableFuture.completedFuture(true);
} else {
isAuthorizedFuture = CompletableFuture.completedFuture(false);
}
return isAuthorizedFuture;
}
@Override
public Boolean allowTopicPolicyOperation(TopicName topicName, String role, PolicyName policy,
PolicyOperation operation, AuthenticationDataSource authData) {
try {
return allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData).get();
} catch (InterruptedException | ExecutionException e) {
throw new RestException(e);
}
}
}
public static class ClientAuthentication implements Authentication {
String user;
public ClientAuthentication(String user) {
this.user = user;
}
@Override
public void close() throws IOException {
// No-op
}
@Override
public String getAuthMethodName() {
return "test";
}
@Override
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
AuthenticationDataProvider provider = new AuthenticationDataProvider() {
public boolean hasDataForHttp() {
return true;
}
@SuppressWarnings("unchecked")
public Set<Map.Entry<String, String>> getHttpHeaders() {
return Sets.newHashSet(Maps.immutableEntry("user", user));
}
public boolean hasDataFromCommand() {
return true;
}
public String getCommandData() {
return user;
}
};
return provider;
}
@Override
public void configure(Map<String, String> authParams) {
// No-op
}
@Override
public void start() throws PulsarClientException {
// No-op
}
}
public static class TestAuthenticationProvider implements AuthenticationProvider {
@Override
public void close() throws IOException {
// no-op
}
@Override
public void initialize(ServiceConfiguration config) throws IOException {
// No-op
}
@Override
public String getAuthMethodName() {
return "test";
}
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
return authData.getCommandData() != null ? authData.getCommandData() : authData.getHttpHeader("user");
}
}
}