blob: 234af7afa8d09c2a8f7d49b6059daed1b94ee82c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import java.util.Base64;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.data.ProducerMessages;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class TopicsAuthTest extends MockedPulsarServiceBaseTest {
private final String testLocalCluster = "test";
private final String testTenant = "my-tenant";
private final String testNamespace = "my-namespace";
private final String testTopicName = "my-topic";
private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
private static final String PRODUCE_TOKEN = Jwts.builder().setSubject("producer").signWith(SECRET_KEY).compact();
private static final String CONSUME_TOKEN = Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
@Override
@BeforeMethod
protected void setup() throws Exception {
// enable auth&auth and use JWT at broker
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+ Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("admin");
conf.setSuperUserRoles(superUserRoles);
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
super.internalSetup();
PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString() : brokerUrlTls.toString())
.authentication(AuthenticationToken.class.getName(),
ADMIN_TOKEN);
admin = Mockito.spy(pulsarAdminBuilder.build());
admin.clusters().createCluster(testLocalCluster, new ClusterDataImpl());
admin.tenants().createTenant(testTenant, new TenantInfoImpl(Set.of("role1", "role2"),
Set.of(testLocalCluster)
));
admin.namespaces().createNamespace(testTenant + "/" + testNamespace,
Set.of(testLocalCluster));
admin.namespaces().grantPermissionOnNamespace(testTenant + "/" + testNamespace, "producer",
EnumSet.of(AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(testTenant + "/" + testNamespace, "consumer",
EnumSet.of(AuthAction.consume));
}
@Override
@AfterMethod
protected void cleanup() throws Exception {
super.internalCleanup();
}
@DataProvider(name = "variations")
public static Object[][] variations() {
return new Object[][]{
{CONSUME_TOKEN, 401},
{PRODUCE_TOKEN, 200}
};
}
@Test(dataProvider = "variations")
public void testProduceToNonPartitionedTopic(String token, int status) throws Exception {
innerTestProduce(testTopicName, true, false, token, status);
}
@Test(dataProvider = "variations")
public void testProduceToPartitionedTopic(String token, int status) throws Exception {
innerTestProduce(testTopicName, true, true, token, status);
}
@Test(dataProvider = "variations")
public void testProduceOnNonPersistentNonPartitionedTopic(String token, int status) throws Exception {
innerTestProduce(testTopicName, false, false, token, status);
}
@Test(dataProvider = "variations")
public void testProduceOnNonPersistentPartitionedTopic(String token, int status) throws Exception {
innerTestProduce(testTopicName, false, true, token, status);
}
private void innerTestProduce(String createTopicName, boolean isPersistent, boolean isPartition,
String token, int status) throws Exception {
String topicPrefix = null;
if (isPersistent == true) {
topicPrefix = "persistent";
} else {
topicPrefix = "non-persistent";
}
if (isPartition == true) {
admin.topics().createPartitionedTopic(topicPrefix + "://" + testTenant + "/"
+ testNamespace + "/" + createTopicName, 5);
} else {
admin.topics().createNonPartitionedTopic(topicPrefix + "://" + testTenant + "/"
+ testNamespace + "/" + createTopicName);
}
Schema<String> schema = StringSchema.utf8();
ProducerMessages producerMessages = new ProducerMessages();
producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper().
writeValueAsString(schema.getSchemaInfo()));
producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().
writeValueAsString(schema.getSchemaInfo()));
String message = "[" +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
producerMessages.setMessages(createMessages(message));
WebTarget root = buildWebClient();
String requestPath = null;
if (isPartition == true) {
requestPath = "/topics/" + topicPrefix + "/" + testTenant + "/" + testNamespace + "/"
+ createTopicName + "/partitions/2";
} else {
requestPath = "/topics/" + topicPrefix + "/" + testTenant + "/" + testNamespace + "/" + createTopicName;
}
Response response = root.path(requestPath)
.request(MediaType.APPLICATION_JSON)
.header("Authorization", "Bearer " + token)
.post(Entity.json(producerMessages));
Assert.assertEquals(response.getStatus(), status);
}
private static List<ProducerMessage> createMessages(String message) throws JsonProcessingException {
return ObjectMapperFactory.getMapper().reader()
.forType(new TypeReference<List<ProducerMessage>>() {
}).readValue(message);
}
WebTarget buildWebClient() throws Exception {
ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
httpConfig.register(MultiPartFeature.class);
javax.ws.rs.client.ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(httpConfig);
Client client = clientBuilder.build();
return client.target(brokerUrl.toString());
}
}