blob: 6c27e2cd5f22be8f14b2ddaf45a9b9ef3ce5549c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
public class SubscriptionSeekTest extends BrokerTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
super.baseSetup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
}
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeek";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
assertEquals(topicRef.getSubscriptions().size(), 1);
List<MessageId> messageIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
MessageId msgId = producer.send(message.getBytes());
messageIds.add(msgId);
}
PersistentSubscription sub = topicRef.getSubscription("my-subscription");
assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
consumer.seek(MessageId.latest);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
// Wait for consumer to reconnect
Awaitility.await().until(consumer::isConnected);
consumer.seek(MessageId.earliest);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
Awaitility.await().until(consumer::isConnected);
consumer.seek(messageIds.get(5));
assertEquals(sub.getNumberOfEntriesInBacklog(false), 5);
MessageIdImpl messageId = (MessageIdImpl) messageIds.get(5);
MessageIdImpl beforeEarliest = new MessageIdImpl(
messageId.getLedgerId() - 1, messageId.getEntryId(), messageId.getPartitionIndex());
MessageIdImpl afterLatest = new MessageIdImpl(
messageId.getLedgerId() + 1, messageId.getEntryId(), messageId.getPartitionIndex());
log.info("MessageId {}: beforeEarliest: {}, afterLatest: {}", messageId, beforeEarliest, afterLatest);
Awaitility.await().until(consumer::isConnected);
consumer.seek(beforeEarliest);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
Awaitility.await().until(consumer::isConnected);
consumer.seek(afterLatest);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
}
@Test
public void testSeekForBatch() throws Exception {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
String subscriptionName = "my-subscription-batch";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.topic(topicName).create();
List<MessageId> messageIds = new ArrayList<>();
List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
List<String> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
messages.add(message);
CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
futureMessageIds.add(messageIdCompletableFuture);
}
for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
MessageId messageId = futureMessageId.get();
messageIds.add(messageId);
}
producer.close();
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
.startMessageIdInclusive()
.subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getSubscriptions().size(), 1);
consumer.seek(MessageId.earliest);
Message<String> receiveBeforEarliest = consumer.receive();
assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
consumer.seek(MessageId.latest);
Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
assertNull(receiveAfterLatest);
for (MessageId messageId : messageIds) {
consumer.seek(messageId);
MessageId receiveId = consumer.receive().getMessageId();
assertEquals(receiveId, messageId);
}
}
@Test
public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
String subscriptionName = "my-subscription-batch";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
// set batch max publish delay big enough to make sure entry has 3 messages
.batchingMaxPublishDelay(10, TimeUnit.SECONDS)
.topic(topicName).create();
List<MessageId> messageIds = new ArrayList<>();
List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
List<String> messages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String message = "my-message-" + i;
messages.add(message);
CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
futureMessageIds.add(messageIdCompletableFuture);
}
for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
MessageId messageId = futureMessageId.get();
messageIds.add(messageId);
}
producer.close();
assertTrue(messageIds.get(0) instanceof BatchMessageIdImpl);
assertTrue(messageIds.get(1) instanceof BatchMessageIdImpl);
assertTrue(messageIds.get(2) instanceof BatchMessageIdImpl);
BatchMessageIdImpl batchMsgId0 = (BatchMessageIdImpl) messageIds.get(0);
BatchMessageIdImpl batchMsgId1 = (BatchMessageIdImpl) messageIds.get(1);
BatchMessageIdImpl msgIdToSeekFirst = (BatchMessageIdImpl) messageIds.get(2);
assertEquals(batchMsgId0.getEntryId(), batchMsgId1.getEntryId());
assertEquals(batchMsgId1.getEntryId(), msgIdToSeekFirst.getEntryId());
PulsarClient newPulsarClient = PulsarClient.builder()
// set start backoff interval short enough to make sure client will re-connect quickly
.startingBackoffInterval(1, TimeUnit.MICROSECONDS)
.serviceUrl(lookupUrl.toString())
.build();
org.apache.pulsar.client.api.Consumer<String> consumer = newPulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
.startMessageIdInclusive()
.subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getSubscriptions().size(), 1);
consumer.seek(msgIdToSeekFirst);
MessageId msgId = consumer.receive().getMessageId();
assertTrue(msgId instanceof BatchMessageIdImpl);
BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
assertEquals(batchMsgId, msgIdToSeekFirst);
consumer.seek(MessageId.earliest);
Message<String> receiveBeforEarliest = consumer.receive();
assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
consumer.seek(MessageId.latest);
Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
assertNull(receiveAfterLatest);
for (MessageId messageId : messageIds) {
consumer.seek(messageId);
MessageId receiveId = consumer.receive().getMessageId();
assertEquals(receiveId, messageId);
}
newPulsarClient.close();
}
@Test
public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString();
String subscriptionName = "my-subscription-batch";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.topic(topicName).create();
List<MessageId> messageIds = new ArrayList<>();
List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
List<String> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
messages.add(message);
CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
futureMessageIds.add(messageIdCompletableFuture);
}
for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
MessageId messageId = futureMessageId.get();
messageIds.add(messageId);
}
producer.close();
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
.subscribe();
admin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest);
// Wait consumer reconnect
Awaitility.await().until(consumer::isConnected);
Message<String> receiveBeforeEarliest = consumer.receive();
assertEquals(receiveBeforeEarliest.getValue(), messages.get(0));
admin.topics().resetCursor(topicName, subscriptionName, MessageId.latest);
// Wait consumer reconnect
Awaitility.await().until(consumer::isConnected);
Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
assertNull(receiveAfterLatest);
admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(0), true);
// Wait consumer reconnect
Awaitility.await().until(consumer::isConnected);
Message<String> received = consumer.receive();
assertEquals(received.getMessageId(), messageIds.get(1));
admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(0), false);
// Wait consumer reconnect
Awaitility.await().until(consumer::isConnected);
received = consumer.receive();
assertEquals(received.getMessageId(), messageIds.get(0));
admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(messageIds.size() - 1), true);
// Wait consumer reconnect
Awaitility.await().until(consumer::isConnected);
received = consumer.receive(1, TimeUnit.SECONDS);
assertNull(received);
admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(messageIds.size() - 1), false);
// Wait consumer reconnect
Awaitility.await().until(consumer::isConnected);
received = consumer.receive();
assertEquals(received.getMessageId(), messageIds.get(messageIds.size() - 1));
admin.topics().resetCursor(topicName, subscriptionName, new BatchMessageIdImpl(-1, -1, -1,10), true);
// Wait consumer reconnect
Awaitility.await().until(consumer::isConnected);
received = consumer.receive();
assertEquals(received.getMessageId(), messageIds.get(0));
}
@Test
public void testConcurrentResetCursor() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
final String subscriptionName = "test-sub-name";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
List<MessageId> messageIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
MessageId msgId = producer.send(message.getBytes());
messageIds.add(msgId);
}
List<PulsarAdminException> exceptions = Lists.newLinkedList();
class ResetCursorThread extends Thread {
public void run() {
try {
admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(3));
} catch (PulsarAdminException e) {
exceptions.add(e);
}
}
}
List<ResetCursorThread> resetCursorThreads = Lists.newLinkedList();
for (int i = 0; i < 4; i ++) {
ResetCursorThread thread = new ResetCursorThread();
resetCursorThreads.add(thread);
}
for (int i = 0; i < 4; i ++) {
resetCursorThreads.get(i).start();
}
for (int i = 0; i < 4; i ++) {
resetCursorThreads.get(i).join();
}
for (PulsarAdminException exception : exceptions) {
log.error("Meet Exception", exception);
assertTrue(exception.getMessage().contains("Failed to fence subscription"));
}
}
@Test
public void testSeekOnPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeekPartitions";
admin.topics().createPartitionedTopic(topicName, 2);
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").subscribe();
try {
consumer.seek(MessageId.latest);
} catch (PulsarClientException e) {
fail("Should not have exception");
}
}
@Test
public void testSeekTime() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeekTime";
String resetTimeStr = "100s";
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
assertEquals(topicRef.getSubscriptions().size(), 1);
PersistentSubscription sub = topicRef.getSubscription("my-subscription");
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
long currentTimestamp = System.currentTimeMillis();
consumer.seek(currentTimestamp);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
// Wait for consumer to reconnect
Awaitility.await().until(consumer::isConnected);
consumer.seek(currentTimestamp - resetTimeInMillis);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
}
@Test
public void testSeekTimeByFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
int partitionNum = 4;
int msgNum = 20;
admin.topics().createPartitionedTopic(topicName, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topic(topicName).subscriptionName("my-sub").subscribe();
long now = System.currentTimeMillis();
consumer.seek((topic) -> now);
assertNull(consumer.receive(1, TimeUnit.SECONDS));
consumer.seek((topic) -> {
TopicName name = TopicName.get(topic);
switch (name.getPartitionIndex()) {
case 0:
return MessageId.latest;
case 1:
return MessageId.earliest;
case 2:
return now;
case 3:
return now - 999999;
default:
return null;
}
});
int count = 0;
while (true) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
}
int msgNumInPartition0 = 0;
int msgNumInPartition1 = msgNum / partitionNum;
int msgNumInPartition2 = 0;
int msgNumInPartition3 = msgNum / partitionNum;
assertEquals(count, msgNumInPartition0 + msgNumInPartition1 + msgNumInPartition2 + msgNumInPartition3);
}
@Test
public void testSeekTimeOnPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeekTimePartitions";
final String resetTimeStr = "100s";
final int partitions = 2;
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
admin.topics().createPartitionedTopic(topicName, partitions);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").subscribe();
List<PersistentSubscription> subs = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(topicName + TopicName.PARTITIONED_TOPIC_SUFFIX + i).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
assertEquals(topicRef.getSubscriptions().size(), 1);
PersistentSubscription sub = topicRef.getSubscription("my-subscription");
assertNotNull(sub);
subs.add(sub);
}
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
long backlogs = 0;
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog(false);
}
assertEquals(backlogs, 10);
backlogs = 0;
long currentTimestamp = System.currentTimeMillis();
consumer.seek(currentTimestamp);
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog(false);
}
assertEquals(backlogs, 0);
// Wait for consumer to reconnect
Awaitility.await().until(consumer::isConnected);
consumer.seek(currentTimestamp - resetTimeInMillis);
backlogs = 0;
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog(false);
}
assertEquals(backlogs, 10);
}
@Test
public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscription")
.subscribe();
pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscription")
.subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getSubscriptions().size(), 1);
List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
Set<String> connectedSinceSet = new HashSet<>();
for (Consumer consumer : consumers) {
connectedSinceSet.add(consumer.getStats().getConnectedSince());
}
assertEquals(connectedSinceSet.size(), 2);
consumer1.seek(MessageId.earliest);
// Wait for consumer to reconnect
Awaitility.await().until(consumer1::isConnected);
consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
for (Consumer consumer : consumers) {
assertFalse(connectedSinceSet.contains(consumer.getStats().getConnectedSince()));
}
}
@Test
public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("my-subscription")
.subscribe();
pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("my-subscription")
.subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getSubscriptions().size(), 1);
List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
Set<String> connectedSinceSet = new HashSet<>();
for (Consumer consumer : consumers) {
connectedSinceSet.add(consumer.getStats().getConnectedSince());
}
assertEquals(connectedSinceSet.size(), 2);
consumer1.seek(MessageId.earliest);
// Wait for consumer to reconnect
Awaitility.await().until(consumer1::isConnected);
consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
boolean hasConsumerNotDisconnected = false;
for (Consumer consumer : consumers) {
if (connectedSinceSet.contains(consumer.getStats().getConnectedSince())) {
hasConsumerNotDisconnected = true;
}
}
assertTrue(hasConsumerNotDisconnected);
}
@Test
public void testSeekByFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
int partitionNum = 4;
int msgNum = 160;
admin.topics().createPartitionedTopic(topicName, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topic(topicName).subscriptionName("my-sub").subscribe();
TopicName partitionedTopic = TopicName.get(topicName);
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.startMessageId(MessageId.earliest)
.topic(partitionedTopic.getPartition(0).toString()).create();
List<MessageId> list = new ArrayList<>();
while (reader.hasMessageAvailable()) {
list.add(reader.readNext().getMessageId());
}
// get middle msg from partition-0
MessageId middleMsgIdInPartition0 = list.get(list.size() / 2);
List<MessageId> msgNotIn = list.subList(0, list.size() / 2 - 1);
// get last msg from partition-1
MessageId lastMsgInPartition1 = admin.topics().getLastMessageId(partitionedTopic.getPartition(1).toString());
reader.close();
reader = pulsarClient.newReader(Schema.STRING)
.startMessageId(MessageId.earliest)
.topic(partitionedTopic.getPartition(2).toString()).create();
// get first msg from partition-2
MessageId firstMsgInPartition2 = reader.readNext().getMessageId();
consumer.seek((topic) -> {
int index = TopicName.get(topic).getPartitionIndex();
if (index == 0) {
return middleMsgIdInPartition0;
} else if (index == 1) {
return lastMsgInPartition1;
} else if (index == 2) {
return firstMsgInPartition2;
}
return null;
});
Set<MessageId> received = new HashSet<>();
while (true) {
Message<String> message = consumer.receive(2, TimeUnit.SECONDS);
if (message == null) {
break;
}
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
received.add(topicMessageId.getInnerMessageId());
}
int msgNumFromPartition1 = list.size() / 2;
int msgNumFromPartition2 = 1;
int msgNumFromPartition3 = msgNum / partitionNum;
assertEquals(received.size(), msgNumFromPartition1 + msgNumFromPartition2 + msgNumFromPartition3);
assertTrue(received.contains(middleMsgIdInPartition0));
assertTrue(received.contains(lastMsgInPartition1));
assertTrue(received.contains(firstMsgInPartition2));
for (MessageId messageId : msgNotIn) {
assertFalse(received.contains(messageId));
}
reader.close();
consumer.close();
}
private List<MessageId> creatProducerAndSendMsg(String topic, int msgNum) throws Exception {
List<MessageId> messageIds = new ArrayList<>();
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.topic(topic).create();
for (int i = 0; i < msgNum; i++) {
messageIds.add(producer.send("msg" + i));
}
producer.close();
return messageIds;
}
@Test
public void testSeekByFunctionAndMultiTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
final String topicName2 = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
int partitionNum = 3;
int msgNum = 15;
admin.topics().createPartitionedTopic(topicName, partitionNum);
admin.topics().createPartitionedTopic(topicName2, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
creatProducerAndSendMsg(topicName2, msgNum);
TopicName topic = TopicName.get(topicName);
TopicName topic2 = TopicName.get(topicName2);
MessageId msgIdInTopic1Partition0 = admin.topics().getLastMessageId(topic.getPartition(0).toString());
MessageId msgIdInTopic1Partition2 = admin.topics().getLastMessageId(topic.getPartition(2).toString());
MessageId msgIdInTopic2Partition0 = admin.topics().getLastMessageId(topic2.getPartition(0).toString());
MessageId msgIdInTopic2Partition2 = admin.topics().getLastMessageId(topic2.getPartition(2).toString());
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe();
consumer.seek((partitionedTopic) -> {
if (partitionedTopic.equals(topic.getPartition(0).toString())) {
return msgIdInTopic1Partition0;
}
if (partitionedTopic.equals(topic.getPartition(2).toString())) {
return msgIdInTopic1Partition2;
}
if (partitionedTopic.equals(topic2.getPartition(0).toString())) {
return msgIdInTopic2Partition0;
}
if (partitionedTopic.equals(topic2.getPartition(2).toString())) {
return msgIdInTopic2Partition2;
}
return MessageId.earliest;
});
int count = 0;
while (true) {
Message message = consumer.receive(2, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
}
int msgInTopic1Partition0 = 1;
int msgInTopic1Partition1 = msgNum / partitionNum;
int msgInTopic1Partition2 = 1;
assertEquals(count, (msgInTopic1Partition0 + msgInTopic1Partition1 + msgInTopic1Partition2) * 2);
}
@Test
public void testExceptionBySeekFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
creatProducerAndSendMsg(topicName,10);
org.apache.pulsar.client.api.Consumer consumer = pulsarClient
.newConsumer()
.topic(topicName).subscriptionName("my-sub").subscribe();
try {
consumer.seek((Function<String, MessageId>) null);
fail("should fail");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
assertTrue(e.getMessage().contains("Function must be set"));
}
assertNull(consumer.seekAsync((topic)-> null).get());
try {
assertNull(consumer.seekAsync((topic)-> new Object()).get());
fail("should fail");
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException);
assertTrue(e.getCause().getMessage().contains("Only support seek by messageId or timestamp"));
}
}
}