blob: 1f755234009bade6847bf1a4c6805f285f18cd1d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.admin.v2.ExtPersistentTopics;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.zookeeper.KeeperException;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.internal.util.MockUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-admin")
public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
private PersistentTopics persistentTopics;
private ExtPersistentTopics extPersistentTopics;
private final String testTenant = "my-tenant";
private final String testLocalCluster = "use";
private final String testNamespace = "my-namespace";
private final String testNamespaceLocal = "my-namespace-local";
protected Field uriField;
protected UriInfo uriInfo;
private NonPersistentTopics nonPersistentTopic;
private NamespaceResources namespaceResources;
@BeforeClass
public void initPersistentTopics() throws Exception {
uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
uriInfo = mock(UriInfo.class);
}
@Override
@BeforeMethod
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(false);
super.internalSetup();
persistentTopics = spy(PersistentTopics.class);
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
doReturn(false).when(persistentTopics).isRequestHttps();
doReturn(null).when(persistentTopics).originalPrincipal();
doReturn("test").when(persistentTopics).clientAppId();
doReturn(TopicDomain.persistent.value()).when(persistentTopics).domain();
doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();
extPersistentTopics = spy(ExtPersistentTopics.class);
extPersistentTopics.setServletContext(new MockServletContext());
extPersistentTopics.setPulsar(pulsar);
doReturn(false).when(extPersistentTopics).isRequestHttps();
doReturn(null).when(extPersistentTopics).originalPrincipal();
doReturn("test").when(extPersistentTopics).clientAppId();
doReturn(TopicDomain.persistent.value()).when(extPersistentTopics).domain();
doNothing().when(extPersistentTopics).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(extPersistentTopics).clientAuthData();
nonPersistentTopic = spy(NonPersistentTopics.class);
nonPersistentTopic.setServletContext(new MockServletContext());
nonPersistentTopic.setPulsar(pulsar);
namespaceResources = mock(NamespaceResources.class);
doReturn(false).when(nonPersistentTopic).isRequestHttps();
doReturn(null).when(nonPersistentTopic).originalPrincipal();
doReturn("test").when(nonPersistentTopic).clientAppId();
doReturn(TopicDomain.non_persistent.value()).when(nonPersistentTopic).domain();
doNothing().when(nonPersistentTopic).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(nonPersistentTopic).clientAuthData();
PulsarResources resources =
spy(new PulsarResources(pulsar.getLocalMetadataStore(), pulsar.getConfigurationMetadataStore()));
doReturn(spy(new TopicResources(pulsar.getLocalMetadataStore()))).when(resources).getTopicResources();
doReturn(resources).when(pulsar).getPulsarResources();
admin.clusters().createCluster("use", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
admin.tenants().createTenant(this.testTenant,
new TenantInfoImpl(Set.of("role1", "role2"), Set.of(testLocalCluster, "test")));
admin.tenants().createTenant("pulsar",
new TenantInfoImpl(Set.of("role1", "role2"), Set.of(testLocalCluster, "test")));
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Set.of(testLocalCluster, "test"));
admin.namespaces().createNamespace("pulsar/system", 4);
admin.namespaces().createNamespace(testTenant + "/" + testNamespaceLocal);
}
@Override
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testGetSubscriptions() {
String testLocalTopicName = "topic-not-found";
// 1) Confirm that the topic does not exist
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName, true);
ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
Assert.assertEquals(errorCaptor.getValue().getMessage(), String.format("Topic %s not found",
"persistent://my-tenant/my-namespace/topic-not-found"));
// 2) Confirm that the partitioned topic does not exist
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0",
true);
errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
Assert.assertEquals(errorCaptor.getValue().getMessage(),
"Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has "
+ "zero partitions");
// Confirm that the namespace does not exist
String notExistNamespace = "not-exist-namespace";
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, notExistNamespace, testLocalTopicName,
true);
errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
Assert.assertEquals(errorCaptor.getValue().getMessage(), "Namespace not found");
// 3) Create the partitioned topic
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 4) Create a subscription
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
new ResetCursorData(MessageId.earliest), false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 5) Confirm that the subscription exists
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0",
true);
verify(response, timeout(5000).times(1)).resume(List.of("test"));
// 6) Delete the subscription
response = mock(AsyncResponse.class);
persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", false,
true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 7) Confirm that the subscription does not exist
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0",
true);
verify(response, timeout(5000).times(1)).resume(new ArrayList<>());
// 8) Create a sub of partitioned-topic
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName + "-partition-1",
"test", true,
new ResetCursorData(MessageId.earliest), false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
//
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-1",
true);
verify(response, timeout(5000).times(1)).resume(List.of("test"));
//
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0",
true);
verify(response, timeout(5000).times(1)).resume(new ArrayList<>());
//
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName, true);
verify(response, timeout(5000).times(1)).resume(List.of("test"));
// 9) Delete the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}
@Test
public void testCreateSubscriptions() throws Exception {
final int numberOfMessages = 5;
final String SUB_EARLIEST = "sub-earliest";
final String SUB_LATEST = "sub-latest";
final String SUB_NONE_MESSAGE_ID = "sub-none-message-id";
String testLocalTopicName = "subWithPositionOrNot";
final String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testLocalTopicName;
admin.topics().createNonPartitionedTopic(topicName);
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName)
.maxPendingMessages(30000).create();
// 1) produce numberOfMessages message to pulsar
for (int i = 0; i < numberOfMessages; i++) {
log.info("Produce messages: " + producer.send(new byte[10]).toString());
}
// 2) Create a subscription from earliest position
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_EARLIEST, true,
new ResetCursorData(MessageId.earliest), false);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false);
ArgumentCaptor<TopicStats> statCaptor = ArgumentCaptor.forClass(TopicStats.class);
verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
TopicStats topicStats = statCaptor.getValue();
long msgBacklog = topicStats.getSubscriptions().get(SUB_EARLIEST).getMsgBacklog();
log.info("Message back log for " + SUB_EARLIEST + " is :" + msgBacklog);
Assert.assertEquals(msgBacklog, numberOfMessages);
// 3) Create a subscription with form latest position
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_LATEST, true,
new ResetCursorData(MessageId.latest), false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false);
statCaptor = ArgumentCaptor.forClass(TopicStats.class);
verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
topicStats = statCaptor.getValue();
msgBacklog = topicStats.getSubscriptions().get(SUB_LATEST).getMsgBacklog();
log.info("Message back log for " + SUB_LATEST + " is :" + msgBacklog);
Assert.assertEquals(msgBacklog, 0);
// 4) Create a subscription without position
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName,
SUB_NONE_MESSAGE_ID, true,
null, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false);
statCaptor = ArgumentCaptor.forClass(TopicStats.class);
verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
topicStats = statCaptor.getValue();
msgBacklog = topicStats.getSubscriptions().get(SUB_NONE_MESSAGE_ID).getMsgBacklog();
log.info("Message back log for " + SUB_NONE_MESSAGE_ID + " is :" + msgBacklog);
Assert.assertEquals(msgBacklog, 0);
// 5) Create replicated subscription
response = mock(AsyncResponse.class);
String replicateSubName = "sub-none-message-id-replicated-sub";
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, replicateSubName,
true,
null, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false);
statCaptor = ArgumentCaptor.forClass(TopicStats.class);
verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
topicStats = statCaptor.getValue();
Assert.assertNotNull(topicStats.getSubscriptions().get(replicateSubName));
Assert.assertTrue(topicStats.getSubscriptions().get(replicateSubName).isReplicated());
producer.close();
}
@Test
public void testCreateSubscriptionForNonPersistentTopic() throws InterruptedException {
doReturn(TopicDomain.non_persistent.value()).when(persistentTopics).domain();
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<WebApplicationException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createSubscription(response, testTenant, testNamespace,
"testCreateSubscriptionForNonPersistentTopic", "sub",
true, new ResetCursorData(MessageId.earliest), false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(),
Response.Status.BAD_REQUEST.getStatusCode());
}
@Test
public void testTerminatePartitionedTopic() {
String testLocalTopicName = "topic-not-found";
// 3) Create the partitioned topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 5) Create a subscription
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
new ResetCursorData(MessageId.earliest), false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 9) terminate partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.terminatePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true);
Map<Integer, MessageId> messageIds = new ConcurrentHashMap<>();
messageIds.put(0, new MessageIdImpl(3, -1, -1));
verify(response, timeout(5000).times(1)).resume(messageIds);
}
@Test
public void testTerminate() {
String testLocalTopicName = "topic-not-found";
// 1) Create the nonPartitionTopic topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, null);
// 2) Create a subscription
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
new ResetCursorData(MessageId.earliest), false);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 3) Assert terminate persistent topic
response = mock(AsyncResponse.class);
persistentTopics.terminate(response, testTenant, testNamespace, testLocalTopicName, true);
MessageId messageId = new MessageIdImpl(3, -1, -1);
verify(response, timeout(5000).times(1)).resume(messageId);
// 4) Assert terminate non-persistent topic
String nonPersistentTopicName = "non-persistent-topic";
try {
nonPersistentTopic.terminate(response, testTenant, testNamespace, nonPersistentTopicName, true);
Assert.fail("Should fail validation on non-persistent topic");
} catch (RestException e) {
Assert.assertEquals(Response.Status.NOT_ACCEPTABLE.getStatusCode(), e.getResponse().getStatus());
}
}
@Test
public void testNonPartitionedTopics() {
final String nonPartitionTopic = BrokerTestUtil.newUniqueName("non-partitioned-topic");
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, nonPartitionTopic, "test", true,
new ResetCursorData(MessageId.latest), false);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, nonPartitionTopic + "-partition-0",
true);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertTrue(errorCaptor.getValue().getMessage().contains("zero partitions"));
response = mock(AsyncResponse.class);
final String nonPartitionTopic2 = BrokerTestUtil.newUniqueName("secondary-non-partitioned-topic");
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, nonPartitionTopic2, true, null);
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(admin.topics().getList(testTenant + "/" + testNamespace)
.contains("persistent://" + testTenant + "/" + testNamespace + "/" + nonPartitionTopic2));
});
AsyncResponse metaResponse = mock(AsyncResponse.class);
ArgumentCaptor<PartitionedTopicMetadata> metaResponseCaptor =
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
persistentTopics.getPartitionedMetadata(metaResponse, testTenant, testNamespace, nonPartitionTopic, true,
false);
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
metaResponse = mock(AsyncResponse.class);
metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
persistentTopics.getPartitionedMetadata(metaResponse, testTenant, testNamespace, nonPartitionTopic, true, true);
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
}
@Test
public void testCreateNonPartitionedTopic() {
final String topic = "testCreateNonPartitionedTopic-a";
TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic);
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topic, true, null);
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(admin.topics().getList(testTenant + "/" + testNamespace).contains(topicName.toString()));
});
AsyncResponse metaResponse = mock(AsyncResponse.class);
ArgumentCaptor<PartitionedTopicMetadata> metaResponseCaptor =
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
persistentTopics.getPartitionedMetadata(metaResponse, testTenant, testNamespace, topic, true, false);
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
metaResponse = mock(AsyncResponse.class);
metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
persistentTopics.getPartitionedMetadata(metaResponse,
testTenant, testNamespace, topic, true, true);
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
response = mock(AsyncResponse.class);
metaResponse = mock(AsyncResponse.class);
metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
final String topic2 = "testCreateNonPartitionedTopic-b";
TopicName topicName2 = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic2);
Map<String, String> topicMetadata = new HashMap<>();
topicMetadata.put("key1", "value1");
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topic2, true, topicMetadata);
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(admin.topics().getList(testTenant + "/" + testNamespace).contains(topicName2.toString()));
});
persistentTopics.getPartitionedMetadata(metaResponse,
testTenant, testNamespace, topic2, true, false);
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
Assert.assertNull(metaResponseCaptor.getValue().properties);
metaResponse = mock(AsyncResponse.class);
ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
persistentTopics.getProperties(metaResponse,
testTenant, testNamespace, topic2, true);
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
Assert.assertNotNull(metaResponseCaptor2.getValue());
Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
}
@Test
public void testCreatePartitionedTopic() {
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<PartitionedTopicMetadata> responseCaptor =
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
final String topicName = "standard-partitioned-topic-a";
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
Awaitility.await().untilAsserted(() -> {
persistentTopics.getPartitionedMetadata(response,
testTenant, testNamespace, topicName, true, false);
verify(response, timeout(5000).atLeast(1)).resume(responseCaptor.capture());
Assert.assertNull(responseCaptor.getValue().properties);
});
AsyncResponse response2 = mock(AsyncResponse.class);
ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 =
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
final String topicName2 = "standard-partitioned-topic-b";
Map<String, String> topicMetadata = new HashMap<>();
topicMetadata.put("key1", "value1");
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
extPersistentTopics.createPartitionedTopic(response2, testTenant, testNamespace, topicName2, metadata, true);
Awaitility.await().untilAsserted(() -> {
persistentTopics.getPartitionedMetadata(response2,
testTenant, testNamespace, topicName2, true, false);
verify(response2, timeout(5000).atLeast(1)).resume(responseCaptor2.capture());
Assert.assertEquals(responseCaptor2.getValue().properties.size(), 1);
Assert.assertEquals(responseCaptor2.getValue().properties, topicMetadata);
});
AsyncResponse response3 = mock(AsyncResponse.class);
ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
persistentTopics.getProperties(response3, testTenant, testNamespace, topicName2, true);
verify(response3, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
Assert.assertNotNull(metaResponseCaptor2.getValue());
Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
}
@Test
public void testCreateTopicWithReplicationCluster() {
final String topicName = "test-topic-ownership";
NamespaceName namespaceName = NamespaceName.get(testTenant, testNamespace);
CompletableFuture<Optional<Policies>> policyFuture = new CompletableFuture<>();
Policies policies = new Policies();
policyFuture.complete(Optional.of(policies));
when(pulsar.getPulsarResources().getNamespaceResources()).thenReturn(namespaceResources);
doReturn(policyFuture).when(namespaceResources).getPoliciesAsync(namespaceName);
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(),
Response.Status.PRECONDITION_FAILED.getStatusCode());
Assert.assertTrue(
errCaptor.getValue().getMessage().contains("Namespace does not have any clusters configured"));
// Test policy not exist and return 'Namespace not found'
CompletableFuture<Optional<Policies>> policyFuture2 = new CompletableFuture<>();
policyFuture2.complete(Optional.empty());
doReturn(policyFuture2).when(namespaceResources).getPoliciesAsync(namespaceName);
response = mock(AsyncResponse.class);
errCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace not found"));
}
@Test
public void testCreateNonPartitionedTopicWithInvalidName() {
final String topicName = "standard-topic-partition-10";
doAnswer(invocation -> {
TopicName partitionedTopicName = invocation.getArgument(0, TopicName.class);
assert (partitionedTopicName.getLocalName().equals("standard-topic"));
return new PartitionedTopicMetadata(10);
}).when(persistentTopics).getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean());
final AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(),
Response.Status.PRECONDITION_FAILED.getStatusCode());
}
@Test
public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix()
throws KeeperException, InterruptedException {
// Test the case in which user already has topic like topic-name-partition-123 created before we enforce the
// validation.
final String nonPartitionTopicName1 = "standard-topic";
final String nonPartitionTopicName2 = "special-topic-partition-123";
final String partitionedTopicName = "special-topic";
when(pulsar.getPulsarResources().getTopicResources()
.listPersistentTopicsAsync(NamespaceName.get("my-tenant/my-namespace")))
.thenReturn(CompletableFuture.completedFuture(List.of(
"persistent://my-tenant/my-namespace/" + nonPartitionTopicName1,
"persistent://my-tenant/my-namespace/" + nonPartitionTopicName2
)));
// doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache)
// .get(anyString());
// doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2))
// ).when(mockZooKeeperChildrenCache).getAsync(anyString());
doReturn(new Policies()).when(persistentTopics).getNamespacePolicies(any());
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
}
@Test
public void testUpdatePartitionedTopicHavingProperties() throws Exception {
final String tenant = "tenant-testUpdatePartitionedTopicHavingProperties";
final String namespace = "ns-testUpdatePartitionedTopicHavingProperties";
final String topic = "topic-testUpdatePartitionedTopicHavingProperties";
Map<String, String> topicMetadata = new HashMap<>();
topicMetadata.put("key1", "value1");
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant(tenant, tenantInfo);
admin.namespaces().createNamespace(tenant + "/" + namespace, Set.of("test"));
// create a 2 partition topic with properties key1->value1
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<PartitionedTopicMetadata> responseCaptor =
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
extPersistentTopics.createPartitionedTopic(response, tenant, namespace, topic, metadata, true);
Awaitility.await().untilAsserted(() -> {
persistentTopics.getPartitionedMetadata(response,
tenant, namespace, topic, true, false);
verify(response, timeout(5000).atLeast(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().properties.size(), 1);
Assert.assertEquals(responseCaptor.getValue().properties, topicMetadata);
});
// update partition to 5
final int updatedPartition = 5;
AsyncResponse response2 = mock(AsyncResponse.class);
ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 =
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
persistentTopics.updatePartitionedTopic(response2, tenant, namespace, topic, false, false, false, updatedPartition);
Awaitility.await().untilAsserted(() -> {
persistentTopics.getPartitionedMetadata(response2,
tenant, namespace, topic, true, false);
verify(response2, timeout(5000).atLeast(1)).resume(responseCaptor2.capture());
Assert.assertEquals(responseCaptor2.getValue().partitions, updatedPartition);
Assert.assertEquals(responseCaptor2.getValue().properties.size(), 1);
Assert.assertEquals(responseCaptor2.getValue().properties, topicMetadata);
});
}
@Test
public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception {
// Already have non partition topic special-topic-partition-10, shouldn't able to update number of
// partitioned topic to more than 10.
final String nonPartitionTopicName2 = "special-topic-partition-10";
final String partitionedTopicName = "special-topic";
pulsar.getBrokerService().getManagedLedgerFactory()
.open(TopicName.get(nonPartitionTopicName2).getPersistenceNamingEncoding());
doAnswer(invocation -> {
persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace");
persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname");
return null;
}).when(persistentTopics).validatePartitionedTopicName(any(), any(), any());
doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.updatePartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, false, false,
false,
10);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}
@Test(timeOut = 10_000)
public void testUnloadTopic() {
final String topicName = "standard-topic-to-be-unload";
final String partitionTopicName = "partition-topic-to-be-unload";
// 1) not exist topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace, "topic-not-exist", true);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(45_000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
// 2) create non partitioned topic and unload
response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace, topicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 3) create partitioned topic and unload
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 4) delete partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}
@Test(timeOut = 10_000)
public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace, "non-existent-topic", true);
ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(45_000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
}
@Test
public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "__change_events", 3, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
List<String> persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(persistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(),
TopicDomain.persistent.value());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(persistentPartitionedTopics.size(), 2);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
nonPersistentTopic.getPartitionedTopicList(response, testTenant, testNamespace, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
List<String> nonPersistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(),
TopicDomain.non_persistent.value());
}
@Test
public void testGetList() throws Exception {
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic-1", 1, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "__change_events", 1, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getList(response, testTenant, testNamespace, null, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
List<String> topics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(topics.size(), 1);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getList(response, testTenant, testNamespace, null, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
topics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(topics.size(), 2);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
nonPersistentTopic.createNonPartitionedTopic(response, testTenant, testNamespace, "test-topic-2", false, null);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
nonPersistentTopic.createNonPartitionedTopic(response, testTenant, testNamespace, "__change_events", false,
null);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
nonPersistentTopic.getList(response, testTenant, testNamespace, null, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
topics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(topics.size(), 1);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
nonPersistentTopic.getList(response, testTenant, testNamespace, null, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
topics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(topics.size(), 2);
}
@Test
public void testGrantNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPermissionsOnTopic(response, testTenant, testNamespace, topicName);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Map<String, Set<AuthAction>> permissions = (Map<String, Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(permissions.get(role), expectActions);
}
@Test
public void testCreateExistedPartition() {
final AsyncResponse response = mock(AsyncResponse.class);
final String topicName = "test-create-existed-partition";
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3, true);
final String partitionName = TopicName.get(topicName).getPartition(0).getLocalName();
ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, partitionName, false, null);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(),
Response.Status.CONFLICT.getStatusCode());
}
@Test
public void testGrantPartitionedTopic() {
final String partitionedTopicName = "partitioned-topic";
final int numPartitions = 5;
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(
response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Map<String, Set<AuthAction>> permissions = (Map<String, Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(permissions.get(role), expectActions);
}
@Test
public void testRevokeNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, topicName, role);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPermissionsOnTopic(response, testTenant, testNamespace, topicName);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Map<String, Set<AuthAction>> permissions = (Map<String, Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(permissions.get(role), null);
}
@Test
public void testRevokePartitionedTopic() {
final String partitionedTopicName = "partitioned-topic";
final int numPartitions = 5;
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(
response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Map<String, Set<AuthAction>> permissions = (Map<String, Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(permissions.get(role), null);
TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace,
partitionedTopicName);
for (int i = 0; i < numPartitions; i++) {
TopicName partition = topicName.getPartition(i);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPermissionsOnTopic(response, testTenant, testNamespace,
partition.getEncodedLocalName());
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Map<String, Set<AuthAction>> partitionPermissions =
(Map<String, Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(partitionPermissions.get(role), null);
}
}
@Test
public void testTriggerCompactionTopic() {
final String partitionTopicName = "test-part";
final String nonPartitionTopicName = "test-non-part";
// trigger compaction on non-existing topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.compact(response, testTenant, testNamespace, "non-existing-topic", true);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
// create non partitioned topic and compaction on it
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, nonPartitionTopicName, true,
null);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.compact(response, testTenant, testNamespace, nonPartitionTopicName, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// create partitioned topic and compaction on it
response = mock(AsyncResponse.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2, true);
persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}
@Test
public void testPeekWithSubscriptionNameNotExist() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
RetentionPolicies retention = new RetentionPolicies(10, 10);
admin.namespaces().setRetention("tenant-xyz/ns-abc", retention);
final String topic = "persistent://tenant-xyz/ns-abc/topic-testPeekWithSubscriptionNameNotExist";
final String subscriptionName = "sub";
((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, 3, true, null).get();
final String partitionedTopic = topic + "-partition-0";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create();
for (int i = 0; i < 10; ++i) {
producer.send("test" + i);
}
List<Message<byte[]>> messages = admin.topics().peekMessages(partitionedTopic, subscriptionName, 3);
Assert.assertEquals(messages.size(), 3);
producer.close();
}
@Test
public void testGetBacklogSizeByMessageId() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
admin.namespaces().createNamespace("prop-xyz/ns1", Set.of("test"));
final String topicName = "persistent://prop-xyz/ns1/testGetBacklogSize";
admin.topics().createPartitionedTopic(topicName, 1);
@Cleanup
Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.create();
CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
for (int i = 0; i < 10; i++) {
completableFuture = batchProducer.sendAsync("a".getBytes());
}
completableFuture.get();
Assert.assertEquals(Optional.ofNullable(
admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)),
Optional.of(320L));
}
@Test
public void testGetLastMessageId() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
admin.namespaces().createNamespace("prop-xyz/ns1", Set.of("test"));
final String topicName = "persistent://prop-xyz/ns1/testGetLastMessageId";
admin.topics().createNonPartitionedTopic(topicName);
@Cleanup
Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName)
.enableBatching(true)
.batchingMaxMessages(100)
.batchingMaxPublishDelay(2, TimeUnit.SECONDS)
.create();
admin.topics().createSubscription(topicName, "test", MessageId.earliest);
CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
for (int i = 0; i < 10; i++) {
completableFuture = batchProducer.sendAsync("test".getBytes());
}
completableFuture.get();
Assert.assertEquals(((BatchMessageIdImpl) admin.topics().getLastMessageId(topicName)).getBatchIndex(), 9);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.create();
producer.send("test".getBytes());
Assert.assertTrue(admin.topics().getLastMessageId(topicName) instanceof MessageIdImpl);
}
@Test
public void testExamineMessage() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/topic-123";
admin.topics().createPartitionedTopic(topicName, 2);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName + "-partition-0").create();
// Check examine message not allowed on partitioned topic.
try {
admin.topics().examineMessage(topicName, "earliest", 1);
Assert.fail("fail to check examine message not allowed on partitioned topic");
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getMessage(),
"Examine messages on a partitioned topic is not allowed, please try examine message on specific "
+ "topic partition");
}
try {
admin.topics().examineMessage(topicName + "-partition-0", "earliest", 1);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getMessage(),
"Could not examine messages due to the total message is zero");
}
producer.send("message1");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 1).getData()),
"message1");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 1).getData()),
"message1");
producer.send("message2");
producer.send("message3");
producer.send("message4");
producer.send("message5");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 1).getData()),
"message1");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 2).getData()),
"message2");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 3).getData()),
"message3");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 4).getData()),
"message4");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "earliest", 5).getData()),
"message5");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 1).getData()),
"message5");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 2).getData()),
"message4");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 3).getData()),
"message3");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 4).getData()),
"message2");
Assert.assertEquals(
new String(admin.topics().examineMessage(topicName + "-partition-0", "latest", 5).getData()),
"message1");
}
@Test
public void testExamineMessageMetadata() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata";
admin.topics().createPartitionedTopic(topicName, 2);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.producerName("testExamineMessageMetadataProducer")
.compressionType(CompressionType.LZ4)
.topic(topicName + "-partition-0")
.create();
producer.newMessage()
.keyBytes("partition123".getBytes())
.orderingKey(new byte[]{0})
.replicationClusters(List.of("a", "b"))
.sequenceId(112233)
.value("data")
.send();
MessageImpl<byte[]> message = (MessageImpl<byte[]>) admin.topics().examineMessage(
topicName + "-partition-0", "earliest", 1);
//test long
Assert.assertEquals(112233, message.getSequenceId());
//test byte[]
Assert.assertEquals(new byte[]{0}, message.getOrderingKey());
//test bool and byte[]
Assert.assertEquals("partition123".getBytes(), message.getKeyBytes());
Assert.assertTrue(message.hasBase64EncodedKey());
//test arrays
Assert.assertEquals(List.of("a", "b"), message.getReplicateTo());
//test string
Assert.assertEquals(producer.getProducerName(), message.getProducerName());
//test enum
Assert.assertEquals(CompressionType.LZ4.ordinal(), message.getMessageBuilder().getCompression().ordinal());
Assert.assertEquals("data", new String(message.getData()));
}
@Test
public void testOffloadWithNullMessageId() {
final String topicName = "topic-123";
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
response = mock(AsyncResponse.class);
persistentTopics.triggerOffload(
response, testTenant, testNamespace, topicName, true, null);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(),
Response.Status.BAD_REQUEST.getStatusCode());
}
@Test
public void testSetReplicatedSubscriptionStatus() {
final String topicName = "topic-with-repl-sub";
final String partitionName = topicName + "-partition-0";
final String subName = "sub";
// 1) Return 404 if that the topic does not exist
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true,
true);
ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(10000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
// 2) Return 404 if that the partitioned topic does not exist
response = mock(AsyncResponse.class);
persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName,
true, true);
errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(10000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
// 3) Create the partitioned topic
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 4) Create a subscription
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, topicName, subName, true,
new ResetCursorData(MessageId.latest), false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 5) Enable replicated subscription on the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true,
true);
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 6) Disable replicated subscription on the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true,
false);
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 7) Enable replicated subscription on the partition
response = mock(AsyncResponse.class);
persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName,
true, true);
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 8) Disable replicated subscription on the partition
response = mock(AsyncResponse.class);
persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName,
true, false);
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 9) Delete the subscription
response = mock(AsyncResponse.class);
persistentTopics.deleteSubscription(response, testTenant, testNamespace, topicName, subName, false, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 10) Delete the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, topicName, true, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}
@Test
public void testGetMessageById() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
admin.topics().createNonPartitionedTopic(topicName1);
admin.topics().createNonPartitionedTopic(topicName2);
@Cleanup
ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1)
.enableBatching(false).create();
String data1 = "test1";
MessageIdImpl id1 = (MessageIdImpl) producer1.send(data1.getBytes());
@Cleanup
ProducerBase<byte[]> producer2 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName2)
.enableBatching(false).create();
String data2 = "test2";
MessageIdImpl id2 = (MessageIdImpl) producer2.send(data2.getBytes());
Message<byte[]> message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId());
Assert.assertEquals(message1.getData(), data1.getBytes());
Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
Assert.assertEquals(message2.getData(), data2.getBytes());
Assert.expectThrows(PulsarAdminException.NotFoundException.class, () -> {
admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
});
Assert.expectThrows(PulsarAdminException.NotFoundException.class, () -> {
admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
});
}
@Test
public void testGetMessageById4SpecialPropsInMsg() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
admin.topics().createNonPartitionedTopic(topicName1);
Map<String, String> inSpecialProps = new HashMap<>();
inSpecialProps.put("city=shanghai", "tag");
inSpecialProps.put("city,beijing", "haidian");
@Cleanup
ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1)
.enableBatching(false).create();
String data1 = "test1";
MessageIdImpl id1 = (MessageIdImpl) producer1.newMessage().value(data1.getBytes()).properties(inSpecialProps)
.send();
Message<byte[]> message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId());
Assert.assertEquals(message1.getData(), data1.getBytes());
Map<String, String> outSpecialProps = message1.getProperties();
for (String k : inSpecialProps.keySet()) {
Assert.assertEquals(inSpecialProps.get(k), outSpecialProps.get(k));
}
}
@Test
public void testGetMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp";
admin.topics().createNonPartitionedTopic(topicName);
AtomicLong publishTime = new AtomicLong(0);
@Cleanup
ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.intercept(new ProducerInterceptor() {
@Override
public void close() {
}
@Override
public boolean eligible(Message message) {
return true;
}
@Override
public Message beforeSend(Producer producer, Message message) {
return message;
}
@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
Throwable exception) {
publishTime.set(message.getPublishTime());
}
})
.create();
MessageId id1 = producer.send("test1".getBytes());
long publish1 = publishTime.get();
Thread.sleep(10);
MessageId id2 = producer.send("test2".getBytes());
long publish2 = publishTime.get();
Assert.assertTrue(publish1 < publish2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
.compareTo(id2) > 0);
}
@Test
public void testGetMessageIdByTimestampWithCompaction() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestampWithCompaction";
admin.topics().createNonPartitionedTopic(topicName);
Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();
@Cleanup
ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.intercept(new ProducerInterceptor() {
@Override
public void close() {
}
@Override
public boolean eligible(Message message) {
return true;
}
@Override
public Message beforeSend(Producer producer, Message message) {
return message;
}
@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
Throwable exception) {
publishTimeMap.put(message.getMessageId(), message.getPublishTime());
}
})
.create();
MessageId id1 = producer.newMessage().key("K1").value("test1".getBytes()).send();
MessageId id2 = producer.newMessage().key("K2").value("test2".getBytes()).send();
long publish1 = publishTimeMap.get(id1);
long publish2 = publishTimeMap.get(id2);
Assert.assertTrue(publish1 < publish2);
admin.topics().triggerCompaction(topicName);
Awaitility.await().untilAsserted(() ->
assertSame(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS));
admin.topics().unload(topicName);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
assertEquals(internalStats.ledgers.size(), 1);
assertEquals(internalStats.ledgers.get(0).entries, 0);
});
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
.compareTo(id2) > 0);
}
@Test
public void testGetBatchMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp";
admin.topics().createNonPartitionedTopic(topicName);
Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();
@Cleanup
ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MINUTES)
.batchingMaxMessages(2)
.intercept(new ProducerInterceptor() {
@Override
public void close() {
}
@Override
public boolean eligible(Message message) {
return true;
}
@Override
public Message beforeSend(Producer producer, Message message) {
return message;
}
@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
Throwable exception) {
log.info("onSendAcknowledgement, message={}, msgId={},publish_time={},exception={}",
message, msgId, message.getPublishTime(), exception);
publishTimeMap.put(msgId, message.getPublishTime());
}
})
.create();
List<CompletableFuture<MessageId>> idFutureList = new ArrayList<>();
for (int i = 0; i < 4; i++) {
idFutureList.add(producer.sendAsync(new byte[]{(byte) i}));
Thread.sleep(5);
}
List<MessageIdImpl> ids = new ArrayList<>();
for (CompletableFuture<MessageId> future : idFutureList) {
MessageId id = future.get();
ids.add((MessageIdImpl) id);
}
for (MessageIdImpl messageId : ids) {
Assert.assertTrue(publishTimeMap.containsKey(messageId));
log.info("MessageId={},PublishTime={}", messageId, publishTimeMap.get(messageId));
}
//message 0, 1 are in the same batch, as batchingMaxMessages is set to 2.
Assert.assertEquals(ids.get(0).getLedgerId(), ids.get(1).getLedgerId());
MessageIdImpl id1 =
new MessageIdImpl(ids.get(0).getLedgerId(), ids.get(0).getEntryId(), ids.get(0).getPartitionIndex());
long publish1 = publishTimeMap.get(ids.get(0));
Assert.assertEquals(ids.get(2).getLedgerId(), ids.get(3).getLedgerId());
MessageIdImpl id2 =
new MessageIdImpl(ids.get(2).getLedgerId(), ids.get(2).getEntryId(), ids.get(2).getPartitionIndex());
long publish2 = publishTimeMap.get(ids.get(2));
Assert.assertTrue(publish1 < publish2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
.compareTo(id2) > 0);
}
@Test
public void testDeleteTopic() throws Exception {
final String topicName = "topic-1";
AsyncResponse response = mock(AsyncResponse.class);
BrokerService brokerService = spy(pulsar.getBrokerService());
doReturn(brokerService).when(pulsar).getBrokerService();
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, false, null);
CompletableFuture<Void> deleteTopicFuture = CompletableFuture.completedFuture(null);
doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean());
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.deleteTopic(response, testTenant, testNamespace, topicName, true, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
MockUtil.resetMock(brokerService);
CompletableFuture<Void> deleteTopicFuture2 = new CompletableFuture<>();
ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
deleteTopicFuture2.completeExceptionally(new MetadataStoreException("test exception"));
doReturn(deleteTopicFuture2).when(brokerService).deleteTopic(anyString(), anyBoolean());
response = mock(AsyncResponse.class);
persistentTopics.deleteTopic(response, testTenant, testNamespace, topicName, true, true);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
MockUtil.resetMock(brokerService);
CompletableFuture<Void> deleteTopicFuture3 = new CompletableFuture<>();
response = mock(AsyncResponse.class);
deleteTopicFuture3.completeExceptionally(new MetadataStoreException.NotFoundException());
doReturn(deleteTopicFuture3).when(brokerService).deleteTopic(anyString(), anyBoolean());
persistentTopics.deleteTopic(response, testTenant, testNamespace, topicName, false, true);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
}
public void testAdminTerminatePartitionedTopic() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
admin.namespaces().createNamespace("prop-xyz/ns12", Set.of("test"));
final String topicName = "persistent://prop-xyz/ns12/testTerminatePartitionedTopic";
admin.topics().createPartitionedTopic(topicName, 1);
Map<Integer, MessageId> results = new HashMap<>();
results.put(0, new MessageIdImpl(3, -1, -1));
Assert.assertEquals(admin.topics().terminatePartitionedTopic(topicName), results);
// Check examine message not allowed on non-partitioned topic.
admin.topics().createNonPartitionedTopic("persistent://prop-xyz/ns12/test");
try {
admin.topics().terminatePartitionedTopic(topicName);
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getMessage(),
"Termination of a non-partitioned topic is not allowed using partitioned-terminate, please use "
+ "terminate commands.");
}
}
@Test
public void testResetCursorReturnTimeoutWhenZKTimeout() {
String topic = "persistent://" + testTenant + "/" + testNamespace + "/" + "topic-2";
BrokerService brokerService = spy(pulsar.getBrokerService());
doReturn(brokerService).when(pulsar).getBrokerService();
CompletableFuture<Optional<Topic>> completableFuture = new CompletableFuture<>();
doReturn(completableFuture).when(brokerService).getTopicIfExists(topic);
completableFuture.completeExceptionally(new RuntimeException("TimeoutException"));
try {
admin.topics().resetCursor(topic, "my-sub", System.currentTimeMillis());
Assert.fail();
} catch (PulsarAdminException e) {
String errorMsg = ((InternalServerErrorException) e.getCause()).getResponse().readEntity(String.class);
Assert.assertTrue(errorMsg.contains("TimeoutException"));
}
}
@Test
public void testUpdatePartitionedTopic()
throws KeeperException, InterruptedException, PulsarAdminException {
String topicName = "testUpdatePartitionedTopic";
String groupName = "cg_testUpdatePartitionedTopic";
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespaceLocal, topicName, 2, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespaceLocal, topicName, groupName, true,
new ResetCursorData(MessageId.latest), false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
ArgumentCaptor<PartitionedTopicMetadata> metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
PartitionedTopicMetadata partitionedTopicMetadata = metaCaptor.getValue();
Assert.assertEquals(partitionedTopicMetadata.partitions, 2);
doNothing().when(persistentTopics).validatePartitionedTopicName(any(), any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(persistentTopics)
.validatePartitionedTopicMetadataAsync();
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
true, 4);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
response = mock(AsyncResponse.class);
metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
partitionedTopicMetadata = metaCaptor.getValue();
Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
// check number of new partitions must be greater than existing number of partitions
response = mock(AsyncResponse.class);
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
true, 3);
verify(response, timeout(5000).times(1)).resume(throwableCaptor.capture());
Assert.assertEquals(throwableCaptor.getValue().getMessage(),
"Expect partitions 3 can't less than current partitions 4.");
response = mock(AsyncResponse.class);
metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
partitionedTopicMetadata = metaCaptor.getValue();
Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
}
@Test
public void testInternalGetReplicatedSubscriptionStatusFromLocal() throws Exception {
String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal
+ "/testInternalGetReplicatedSubscriptionStatusFromLocal";
String subName = "sub_testInternalGetReplicatedSubscriptionStatusFromLocal";
TopicName topic = TopicName.get(topicName);
admin.topics().createPartitionedTopic(topicName, 2);
admin.topics().createSubscription(topicName, subName, MessageId.latest);
// partition-0 call from local and partition-1 call from admin.
NamespaceService namespaceService = spy(pulsar.getNamespaceService());
doReturn(CompletableFuture.completedFuture(true))
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(0));
doReturn(CompletableFuture.completedFuture(false))
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(1));
doReturn(namespaceService).when(pulsar).getNamespaceService();
PulsarAdmin adminFromPulsar = spy(pulsar.getAdminClient());
doReturn(adminFromPulsar).when(pulsar).getAdminClient();
Topics topics = spy(adminFromPulsar.topics());
doReturn(topics).when(adminFromPulsar).topics();
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getReplicatedSubscriptionStatus(response, testTenant, testNamespaceLocal, topic.getLocalName(),
subName, false);
verify(response, timeout(5000).times(1)).resume(any());
// verify we only call getReplicatedSubscriptionStatusAsync once.
verify(topics, times(1)).getReplicatedSubscriptionStatusAsync(any(), any());
}
@Test
public void testNamespaceResources() throws Exception {
String ns1V1 = "test/" + testNamespace + "v1";
String ns1V2 = testNamespace + "v2";
admin.namespaces().createNamespace(testTenant+"/"+ns1V1);
admin.namespaces().createNamespace(testTenant+"/"+ns1V2);
List<String> namespaces = pulsar.getPulsarResources().getNamespaceResources().listNamespacesAsync(testTenant)
.get();
assertTrue(namespaces.contains(ns1V2));
assertTrue(namespaces.contains(ns1V1));
}
}