blob: 23015beb0b1fbaeee3b05404b24e94d768cca669 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.KeeperException;
import org.mockito.ArgumentCaptor;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@PrepareForTest(PersistentTopics.class)
@PowerMockIgnore("com.sun.management.*")
@Slf4j
@Test(groups = "broker")
public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
private PersistentTopics persistentTopics;
private final String testTenant = "my-tenant";
private final String testLocalCluster = "use";
private final String testNamespace = "my-namespace";
protected Field uriField;
protected UriInfo uriInfo;
private NonPersistentTopics nonPersistentTopic;
@BeforeClass
public void initPersistentTopics() throws Exception {
uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
uriInfo = mock(UriInfo.class);
}
@Override
@BeforeMethod
protected void setup() throws Exception {
super.internalSetup();
persistentTopics = spy(new PersistentTopics());
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
doReturn(mockZooKeeper).when(persistentTopics).localZk();
doReturn(false).when(persistentTopics).isRequestHttps();
doReturn(null).when(persistentTopics).originalPrincipal();
doReturn("test").when(persistentTopics).clientAppId();
doReturn(TopicDomain.persistent.value()).when(persistentTopics).domain();
doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();
nonPersistentTopic = spy(new NonPersistentTopics());
nonPersistentTopic.setServletContext(new MockServletContext());
nonPersistentTopic.setPulsar(pulsar);
doReturn(mockZooKeeper).when(nonPersistentTopic).localZk();
doReturn(false).when(nonPersistentTopic).isRequestHttps();
doReturn(null).when(nonPersistentTopic).originalPrincipal();
doReturn("test").when(nonPersistentTopic).clientAppId();
doReturn(TopicDomain.non_persistent.value()).when(nonPersistentTopic).domain();
doNothing().when(nonPersistentTopic).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(nonPersistentTopic).clientAuthData();
admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:8080"));
admin.clusters().createCluster("test", new ClusterData("http://broker-use.com:8080"));
admin.tenants().createTenant(this.testTenant,
new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test")));
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet(testLocalCluster, "test"));
}
@Override
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testGetSubscriptions() {
String testLocalTopicName = "topic-not-found";
// 1) Confirm that the topic does not exist
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName, true);
ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
Assert.assertEquals(errorCaptor.getValue().getMessage(), "Topic not found");
// 2) Confirm that the partitioned topic does not exist
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0",
true);
errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
Assert.assertEquals(errorCaptor.getValue().getMessage(),
"Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions");
// 3) Create the partitioned topic
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 4) Create a subscription
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
(MessageIdImpl) MessageId.earliest, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 5) Confirm that the subscription exists
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0",
true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList("test"));
// 6) Delete the subscription
response = mock(AsyncResponse.class);
persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", false,true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 7) Confirm that the subscription does not exist
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0",
true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
// 8) Delete the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}
@Test
public void testCreateSubscriptions() throws Exception{
final int numberOfMessages = 5;
final String SUB_EARLIEST = "sub-earliest";
final String SUB_LATEST = "sub-latest";
final String SUB_NONE_MESSAGE_ID = "sub-none-message-id";
String testLocalTopicName = "subWithPositionOrNot";
final String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testLocalTopicName;
admin.topics().createNonPartitionedTopic(topicName);
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName)
.maxPendingMessages(30000).create();
// 1) produce numberOfMessages message to pulsar
for (int i = 0; i < numberOfMessages; i++) {
System.out.println(producer.send(new byte[10]));
}
// 2) Create a subscription from earliest position
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_EARLIEST, true,
(MessageIdImpl) MessageId.earliest, false);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
TopicStats topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true, false);
long msgBacklog = topicStats.subscriptions.get(SUB_EARLIEST).msgBacklog;
System.out.println("Message back log for " + SUB_EARLIEST + " is :" + msgBacklog);
Assert.assertEquals(msgBacklog, numberOfMessages);
// 3) Create a subscription with form latest position
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_LATEST, true,
(MessageIdImpl) MessageId.latest, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true, false);
msgBacklog = topicStats.subscriptions.get(SUB_LATEST).msgBacklog;
System.out.println("Message back log for " + SUB_LATEST + " is :" + msgBacklog);
Assert.assertEquals(msgBacklog, 0);
// 4) Create a subscription without position
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_NONE_MESSAGE_ID, true,
null, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true, false);
msgBacklog = topicStats.subscriptions.get(SUB_NONE_MESSAGE_ID).msgBacklog;
System.out.println("Message back log for " + SUB_NONE_MESSAGE_ID + " is :" + msgBacklog);
Assert.assertEquals(msgBacklog, 0);
producer.close();
}
@Test
public void testCreateSubscriptionForNonPersistentTopic() throws InterruptedException {
doReturn(TopicDomain.non_persistent.value()).when(persistentTopics).domain();
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<WebApplicationException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createSubscription(response, testTenant, testNamespace,
"testCreateSubscriptionForNonPersistentTopic", "sub",
true, (MessageIdImpl) MessageId.earliest, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
}
@Test
public void testTerminatePartitionedTopic() {
String testLocalTopicName = "topic-not-found";
// 3) Create the partitioned topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 5) Create a subscription
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
(MessageIdImpl) MessageId.earliest, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 9) terminate partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.terminatePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true);
verify(response, timeout(5000).times(1)).resume(Arrays.asList(new MessageIdImpl(3, -1, -1)));
}
@Test
public void testNonPartitionedTopics() {
final String nonPartitionTopic = "non-partitioned-topic";
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, nonPartitionTopic, "test", true,
(MessageIdImpl) MessageId.latest, false);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, nonPartitionTopic + "-partition-0",
true);
ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertTrue(errorCaptor.getValue().getMessage().contains("zero partitions"));
final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true);
Assert.assertEquals(persistentTopics
.getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, false).partitions,
0);
Assert.assertEquals(persistentTopics
.getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, true).partitions,
0);
}
@Test
public void testCreateNonPartitionedTopic() {
final String topicName = "standard-topic-partition-a";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
testTenant, testNamespace, topicName, true, false);
Assert.assertEquals(pMetadata.partitions, 0);
PartitionedTopicMetadata metadata = persistentTopics.getPartitionedMetadata(
testTenant, testNamespace, topicName, true, true);
Assert.assertEquals(metadata.partitions, 0);
}
@Test(expectedExceptions = RestException.class)
public void testCreateNonPartitionedTopicWithInvalidName() {
final String topicName = "standard-topic-partition-10";
doAnswer(invocation -> {
TopicName partitionedTopicname = invocation.getArgument(0, TopicName.class);
assert(partitionedTopicname.getLocalName().equals("standard-topic"));
return new PartitionedTopicMetadata(10);
}).when(persistentTopics).getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean());
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
}
@Test
public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException {
// Test the case in which user already has topic like topic-name-partition-123 created before we enforce the validation.
final String nonPartitionTopicName1 = "standard-topic";
final String nonPartitionTopicName2 = "special-topic-partition-123";
final String partitionedTopicName = "special-topic";
LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = mock(ZooKeeperManagedLedgerCache.class);
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
doReturn(new Policies()).when(persistentTopics).getNamespacePolicies(any());
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
}
@Test(expectedExceptions = RestException.class)
public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception {
// Already have non partition topic special-topic-partition-10, shouldn't able to update number of partitioned topic to more than 10.
final String nonPartitionTopicName2 = "special-topic-partition-10";
final String partitionedTopicName = "special-topic";
LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = mock(ZooKeeperManagedLedgerCache.class);
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
doAnswer(invocation -> {
persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace");
persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname");
return null;
}).when(persistentTopics).validatePartitionedTopicName(any(), any(), any());
doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, false, 10);
}
@Test
public void testUnloadTopic() {
final String topicName = "standard-topic-to-be-unload";
final String partitionTopicName = "partition-topic-to-be-unload";
// 1) not exist topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace, "topic-not-exist", true);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
// 2) create non partitioned topic and unload
response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
persistentTopics.unloadTopic(response, testTenant, testNamespace, topicName, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 3) create partitioned topic and unload
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 4) delete partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}
@Test
public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace,"non-existent-topic", true);
ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
}
@Test
public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace);
Assert.assertEquals(persistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.persistent.value());
List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace);
Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
}
@Test
public void testGrantNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
Assert.assertEquals(permissions.get(role), expectActions);
}
@Test
public void testCreateExistedPartition() {
final AsyncResponse response = mock(AsyncResponse.class);
final String topicName = "test-create-existed-partition";
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3);
final String partitionName = TopicName.get(topicName).getPartition(0).getLocalName();
try {
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, partitionName, false);
Assert.fail();
} catch (RestException e) {
log.error("Failed to create {}: {}", partitionName, e.getMessage());
Assert.assertEquals(e.getResponse().getStatus(), 409);
Assert.assertEquals(e.getMessage(), "This topic already exists");
}
}
@Test
public void testGrantPartitionedTopic() {
final String partitionedTopicName = "partitioned-topic";
final int numPartitions = 5;
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
partitionedTopicName);
Assert.assertEquals(permissions.get(role), expectActions);
TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace,
partitionedTopicName);
for (int i = 0; i < numPartitions; i++) {
TopicName partition = topicName.getPartition(i);
Map<String, Set<AuthAction>> partitionPermissions = persistentTopics.getPermissionsOnTopic(testTenant,
testNamespace, partition.getEncodedLocalName());
Assert.assertEquals(partitionPermissions.get(role), expectActions);
}
}
@Test
public void testRevokeNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, topicName, role);
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
Assert.assertEquals(permissions.get(role), null);
}
@Test
public void testRevokePartitionedTopic() {
final String partitionedTopicName = "partitioned-topic";
final int numPartitions = 5;
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role);
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
partitionedTopicName);
Assert.assertEquals(permissions.get(role), null);
TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace,
partitionedTopicName);
for (int i = 0; i < numPartitions; i++) {
TopicName partition = topicName.getPartition(i);
Map<String, Set<AuthAction>> partitionPermissions = persistentTopics.getPermissionsOnTopic(testTenant,
testNamespace, partition.getEncodedLocalName());
Assert.assertEquals(partitionPermissions.get(role), null);
}
}
@Test
public void testTriggerCompactionTopic() {
final String partitionTopicName = "test-part";
final String nonPartitionTopicName = "test-non-part";
// trigger compaction on non-existing topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.compact(response, testTenant, testNamespace, "non-existing-topic", true);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
// create non partitioned topic and compaction on it
response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopicName, true);
persistentTopics.compact(response, testTenant, testNamespace, nonPartitionTopicName, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// create partitioned topic and compaction on it
response = mock(AsyncResponse.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2);
persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}
@Test
public void testPeekWithSubscriptionNameNotExist() throws Exception {
final String topicName = "testTopic";
final String topic = TopicName.get(
TopicDomain.persistent.value(),
testTenant,
testNamespace,
topicName).toString();
final String subscriptionName = "sub";
admin.topics().createPartitionedTopic(topic, 3);
final String partitionedTopic = topic + "-partition-0";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
for (int i = 0; i < 100; ++i) {
producer.send("test" + i);
}
List<Message<byte[]>> messages = admin.topics()
.peekMessages(partitionedTopic, subscriptionName, 5);
Assert.assertEquals(messages.size(), 5);
producer.close();
}
@Test
public void testGetLastMessageId() throws Exception {
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet("test"));
final String topicName = "persistent://prop-xyz/ns1/testGetLastMessageId";
admin.topics().createNonPartitionedTopic(topicName);
Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName)
.enableBatching(true)
.batchingMaxMessages(100)
.batchingMaxPublishDelay(2, TimeUnit.SECONDS)
.create();
admin.topics().createSubscription(topicName, "test", MessageId.earliest);
CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
for (int i = 0; i < 10; i++) {
completableFuture = batchProducer.sendAsync("test".getBytes());
}
completableFuture.get();
Assert.assertEquals(((BatchMessageIdImpl) admin.topics().getLastMessageId(topicName)).getBatchIndex(), 9);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.create();
producer.send("test".getBytes());
Assert.assertTrue(admin.topics().getLastMessageId(topicName) instanceof MessageIdImpl);
}
@Test
public void testExamineMessage() throws Exception {
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/topic-123";
admin.topics().createPartitionedTopic(topicName, 2);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName + "-partition-0").create();
// Check examine message not allowed on partitioned topic.
try {
admin.topics().examineMessage(topicName, "earliest", 1);
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getMessage(), "Examine messages on a partitioned topic is not allowed, please try examine message on specific topic partition");
}
producer.send("message1");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 1).getData()), "message1");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 1).getData()), "message1");
producer.send("message2");
producer.send("message3");
producer.send("message4");
producer.send("message5");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 1).getData()), "message1");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 2).getData()), "message2");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 3).getData()), "message3");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 4).getData()), "message4");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 5).getData()), "message5");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 1).getData()), "message5");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 2).getData()), "message4");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 3).getData()), "message3");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 4).getData()), "message2");
Assert.assertEquals(new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 5).getData()), "message1");
}
@Test
public void testOffloadWithNullMessageId() {
final String topicName = "topic-123";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
try {
persistentTopics.triggerOffload(testTenant, testNamespace, topicName, true, null);
Assert.fail("should have failed");
} catch (RestException e) {
Assert.assertEquals(e.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
}
}
}