blob: 8b430e7b2fa8fb92efff523d216b3f548052a7f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
@Test(groups = "broker-impl")
public class MessageRedeliveryTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(MessageRedeliveryTest.class);
@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@DataProvider(name = "useOpenRangeSet")
public static Object[][] useOpenRangeSet() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
/**
* It tests that ManagedCursor tracks individually deleted messages and markDeletePosition correctly with different
* range-set implementation and re-delivers messages as expected.
*
* @param useOpenRangeSet
* @throws Exception
*/
@Test(dataProvider = "useOpenRangeSet", timeOut = 30000)
public void testRedelivery(boolean useOpenRangeSet) throws Exception {
this.conf.setManagedLedgerMaxEntriesPerLedger(5);
this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
this.conf.setManagedLedgerUnackedRangesOpenCacheSetEnabled(useOpenRangeSet);
@Cleanup("shutdownNow")
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20,
new DefaultThreadFactory("pulsar"));
final String ns1 = "my-property/brok-ns1";
final String subName = "my-subscriber-name";
final int numMessages = 50;
admin.namespaces().createNamespace(ns1, Sets.newHashSet("test"));
final String topic1 = "persistent://" + ns1 + "/my-topic";
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
.subscriptionName(subName).subscriptionType(SubscriptionType.Shared).receiverQueueSize(10)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS).subscribe();
ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
.subscriptionName(subName).subscriptionType(SubscriptionType.Shared).receiverQueueSize(10)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS).subscribe();
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic1).create();
for (int i = 0; i < numMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
CountDownLatch latch = new CountDownLatch(numMessages);
AtomicBoolean consume1 = new AtomicBoolean(true);
AtomicBoolean consume2 = new AtomicBoolean(true);
Set<String> ackedMessages = Sets.newConcurrentHashSet();
AtomicInteger counter = new AtomicInteger(0);
// (1) ack alternate message from consumer-1 which creates ack-hole.
executor.submit(() -> {
while (true) {
try {
Message<byte[]> msg = consumer1.receive(1000, TimeUnit.MILLISECONDS);
if (msg != null) {
if (counter.getAndIncrement() % 2 == 0) {
try {
consumer1.acknowledge(msg);
// ack alternate messages
ackedMessages.add(new String(msg.getData()));
} catch (PulsarClientException e1) {
log.warn("Failed to ack message {}", e1.getMessage());
}
}
} else {
break;
}
} catch (PulsarClientException e2) {
// Ok
break;
}
latch.countDown();
}
});
// (2) ack all the consumed messages from consumer-2
executor.submit(() -> {
while (consume2.get()) {
try {
Message<byte[]> msg = consumer2.receive(1000, TimeUnit.MILLISECONDS);
if (msg != null) {
consumer2.acknowledge(msg);
// ack alternate messages
ackedMessages.add(new String(msg.getData()));
} else {
break;
}
} catch (PulsarClientException e2) {
// Ok
break;
}
latch.countDown();
}
});
latch.await(10000, TimeUnit.MILLISECONDS);
consume1.set(false);
// (3) sleep so, consumer2 should timeout on it's pending read operation and not consume more messages
Thread.sleep(1000);
// (4) here we consume all messages but consumer1 only acked alternate messages.
assertNotEquals(ackedMessages.size(), numMessages);
PersistentTopic pTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicIfExists(topic1).get()
.get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) pTopic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
// (5) now, close consumer1 and let broker deliver consumer1's unack messages to consumer2
consumer1.close();
// (6) broker should redeliver all unack messages of consumer-1 and consumer-2 should ack all of them
CountDownLatch latch2 = new CountDownLatch(1);
executor.submit(() -> {
while (true) {
try {
Message<byte[]> msg = consumer2.receive(1000, TimeUnit.MILLISECONDS);
if (msg != null) {
consumer2.acknowledge(msg);
// ack alternate messages
ackedMessages.add(new String(msg.getData()));
} else {
break;
}
} catch (PulsarClientException e2) {
// Ok
break;
}
if (ackedMessages.size() == numMessages)
latch2.countDown();
}
});
latch2.await(20000, TimeUnit.MILLISECONDS);
consumer2.close();
assertEquals(ackedMessages.size(), numMessages);
// (7) acked message set should be empty
assertEquals(cursor.getIndividuallyDeletedMessagesSet().size(), 0);
// markDelete position should be one position behind read position
assertEquals(cursor.getReadPosition(), cursor.getMarkDeletedPosition().getNext());
producer.close();
consumer2.close();
}
@Test
public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException, PulsarAdminException {
final String topic = "testDoNotRedeliveryMarkDeleteMessages";
final String subName = "my-sub";
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
producer.send("Pulsar".getBytes());
for (int i = 0; i < 2; i++) {
Message message = consumer.receive();
assertNotNull(message);
}
admin.topics().skipAllMessages(topic, subName);
Message message = null;
try {
message = consumer.receive(2, TimeUnit.SECONDS);
} catch (Exception ignore) {
}
assertNull(message);
}
}