blob: c377043df8dfa4aeca0b5aa3571e354d4f4d4d78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.Unpooled;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.Bytes;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessagesImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Pulsar client transaction test.
*/
@Slf4j
@Test(groups = "broker")
public class TransactionTest extends TransactionTestBase {
private static final int NUM_BROKERS = 1;
private static final int NUM_PARTITIONS = 1;
@BeforeClass
protected void setup() throws Exception {
setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
}
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testTopicTransactionMetrics() throws Exception {
final String topic = "persistent://tnx/ns1/test_transaction_topic";
@Cleanup
Producer<byte[]> producer = this.pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
.send();
txn.commit().get();
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn1).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
.send();
txn1.abort().get();
Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn2).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
.send();
PulsarService pulsarService = pulsarServiceList.get(0);
Optional<Topic> optional = pulsarService.getBrokerService().getTopic(topic, false).get();
assertTrue(optional.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) optional.get();
TopicStatsImpl stats = persistentTopic.getStats(false, false, false);
assertEquals(stats.committedTxnCount, 1);
assertEquals(stats.abortedTxnCount, 1);
assertEquals(stats.ongoingTxnCount, 1);
}
@Test
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
try {
// init pending ack
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
}
topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);
// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
assertEquals(list.size(), 2);
list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
try {
// can't create transaction system topic
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
fail();
} catch (PulsarClientException.NotAllowedException e) {
assertTrue(e.getMessage().contains("Can not create transaction system topic"));
}
// can't create transaction system topic
try {
admin.topics().getSubscriptions(topicName);
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
}
// can't create transaction system topic
try {
admin.topics().createPartitionedTopic(topicName, 3);
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}
// can't create transaction system topic
try {
admin.topics().createNonPartitionedTopic(topicName);
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}
}
@Test
public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/test").toString();
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
consumer.close();
Awaitility.await().until(() -> {
try {
pulsarClient.newTransaction()
.withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
} catch (Exception e) {
return false;
}
return true;
});
admin.namespaces().unload(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.namespaces().unload(NAMESPACE1);
@Cleanup
Consumer<byte[]> consumer1 = getConsumer(topicName, subName);
Awaitility.await().until(() -> {
try {
pulsarClient.newTransaction()
.withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
} catch (Exception e) {
return false;
}
return true;
});
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
getPulsarServiceList().get(0).getBrokerService().getTopics();
Assert.assertNull(topics.get(TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString() + 0));
Assert.assertNull(topics.get(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
Assert.assertNull(topics.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName)));
}
public Consumer<byte[]> getConsumer(String topicName, String subName) throws PulsarClientException {
return pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();
}
@Test
public void testGetTxnID() throws Exception {
Transaction transaction = pulsarClient.newTransaction()
.build().get();
TxnID txnID = transaction.getTxnID();
Assert.assertEquals(txnID.getLeastSigBits(), 0);
Assert.assertEquals(txnID.getMostSigBits(), 0);
transaction.abort();
transaction = pulsarClient.newTransaction()
.build().get();
txnID = transaction.getTxnID();
Assert.assertEquals(txnID.getLeastSigBits(), 1);
Assert.assertEquals(txnID.getMostSigBits(), 0);
}
@Test
public void testSubscriptionRecreateTopic()
throws PulsarAdminException, NoSuchFieldException, IllegalAccessException, PulsarClientException {
String topic = "persistent://pulsar/system/testReCreateTopic";
String subName = "sub_testReCreateTopic";
int retentionSizeInMbSetTo = 5;
int retentionSizeInMbSetTopic = 6;
int retentionSizeInMinutesSetTo = 5;
int retentionSizeInMinutesSetTopic = 6;
admin.topics().createNonPartitionedTopic(topic);
PulsarService pulsarService = super.getPulsarServiceList().get(0);
pulsarService.getBrokerService().getTopics().clear();
ManagedLedgerFactory managedLedgerFactory = pulsarService.getBrokerService().getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers =
(ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) field.get(managedLedgerFactory);
ledgers.remove(TopicName.get(topic).getPersistenceNamingEncoding());
try {
admin.topics().createNonPartitionedTopic(topic);
Assert.fail();
} catch (PulsarAdminException.ConflictException e) {
log.info("Cann`t create topic again");
}
admin.topics().setRetention(topic,
new RetentionPolicies(retentionSizeInMinutesSetTopic, retentionSizeInMbSetTopic));
pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscribe();
pulsarService.getBrokerService().getTopicIfExists(topic).thenAccept(option -> {
if (!option.isPresent()) {
log.error("Failed o get Topic named: {}", topic);
Assert.fail();
}
PersistentTopic originPersistentTopic = (PersistentTopic) option.get();
String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subName);
try {
admin.topics().setRetention(pendingAckTopicName,
new RetentionPolicies(retentionSizeInMinutesSetTo, retentionSizeInMbSetTo));
} catch (PulsarAdminException e) {
log.error("Failed to get./setRetention of topic with Exception:" + e);
Assert.fail();
}
PersistentSubscription subscription = originPersistentTopic
.getSubscription(subName);
subscription.getPendingAckManageLedger().thenAccept(managedLedger -> {
long retentionSize = managedLedger.getConfig().getRetentionSizeInMB();
if (!originPersistentTopic.getTopicPolicies().isPresent()) {
log.error("Failed to getTopicPolicies of :" + originPersistentTopic);
Assert.fail();
}
TopicPolicies topicPolicies = originPersistentTopic.getTopicPolicies().get();
Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize);
MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider();
CompletableFuture<PendingAckStore> future = mlPendingAckStoreProvider.newPendingAckStore(subscription);
future.thenAccept(pendingAckStore -> {
((MLPendingAckStore) pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> {
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
retentionSizeInMbSetTo);
});
}
);
});
});
}
@Test
public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/testSnapShot";
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService().getTopic(topic, false)
.get().get();
ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
.newReader(Schema.AVRO(TransactionBufferSnapshot.class))
.startMessageId(MessageId.latest)
.topic(NAMESPACE1 + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
Reader<TransactionBufferSnapshot> reader = readerBuilder.create();
long waitSnapShotTime = getPulsarServiceList().get(0).getConfiguration()
.getTransactionBufferSnapshotMinTimeInMillis();
Awaitility.await().atMost(waitSnapShotTime * 2, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assert.assertFalse(reader.hasMessageAvailable()));
//test take snapshot by build producer by the transactionEnable client
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS)
.topic(topic).enableBatching(true)
.create();
Awaitility.await().untilAsserted(() -> {
Message<TransactionBufferSnapshot> message1 = reader.readNext();
TransactionBufferSnapshot snapshot1 = message1.getValue();
Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), -1);
});
// test snapshot by publish normal messages.
producer.newMessage(Schema.STRING).value("common message send").send();
producer.newMessage(Schema.STRING).value("common message send").send();
Awaitility.await().untilAsserted(() -> {
Message<TransactionBufferSnapshot> message1 = reader.readNext();
TransactionBufferSnapshot snapshot1 = message1.getValue();
Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 1);
});
}
@Test
public void testAppendBufferWithNotManageLedgerExceptionCanCastToMLE()
throws Exception {
String topic = "persistent://pulsar/system/testReCreateTopic";
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic =
(PersistentTopic) pulsarServiceList.get(0).getBrokerService()
.getTopic(topic, false)
.get().get();
CountDownLatch countDownLatch = new CountDownLatch(1);
Topic.PublishContext publishContext = new Topic.PublishContext() {
@Override
public String getProducerName() {
return "test";
}
public long getSequenceId() {
return 30;
}
/**
* Return the producer name for the original producer.
*
* For messages published locally, this will return the same local producer name, though in case of
* replicated messages, the original producer name will differ
*/
public String getOriginalProducerName() {
return "test";
}
public long getOriginalSequenceId() {
return 30;
}
public long getHighestSequenceId() {
return 30;
}
public long getOriginalHighestSequenceId() {
return 30;
}
public long getNumberOfMessages() {
return 30;
}
@Override
public void completed(Exception e, long ledgerId, long entryId) {
Assert.assertTrue(e.getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException);
countDownLatch.countDown();
}
};
//Close topic manageLedger.
persistentTopic.getManagedLedger().close();
//Publish to a closed managerLedger to test ManagerLedgerException.
persistentTopic.publishTxnMessage(new TxnID(123L, 321L),
Unpooled.copiedBuffer("message", UTF_8), publishContext);
//If it times out, it means that the assertTrue in publishContext.completed is failed.
Awaitility.await().until(() -> {
countDownLatch.await();
return true;
});
}
@Test
public void testMaxReadPositionForNormalPublish() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/NormalPublish";
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false)
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).build();
//test the state of TransactionBuffer is NoSnapshot
//before build Producer by pulsarClient that enables transaction.
Producer<String> normalProducer = noTxnClient.newProducer(Schema.STRING)
.producerName("testNormalPublish")
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfNoSnapshot()));
//test publishing normal messages will change maxReadPosition in the state of NoSnapshot.
MessageIdImpl messageId = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
PositionImpl position = topicTransactionBuffer.getMaxReadPosition();
Assert.assertEquals(position.getLedgerId(), messageId.getLedgerId());
Assert.assertEquals(position.getEntryId(), messageId.getEntryId());
//test the state of TransactionBuffer is Ready after build Producer by pulsarClient that enables transaction.
Producer<String> txnProducer = pulsarClient.newProducer(Schema.STRING)
.producerName("testTransactionPublish")
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
//test publishing txn messages will not change maxReadPosition if don`t commit or abort.
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
MessageIdImpl messageId1 = (MessageIdImpl) txnProducer.newMessage(transaction).value("txn message").send();
PositionImpl position1 = topicTransactionBuffer.getMaxReadPosition();
Assert.assertEquals(position1.getLedgerId(), messageId.getLedgerId());
Assert.assertEquals(position1.getEntryId(), messageId.getEntryId());
MessageIdImpl messageId2 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
PositionImpl position2 = topicTransactionBuffer.getMaxReadPosition();
Assert.assertEquals(position2.getLedgerId(), messageId.getLedgerId());
Assert.assertEquals(position2.getEntryId(), messageId.getEntryId());
transaction.commit().get();
PositionImpl position3 = topicTransactionBuffer.getMaxReadPosition();
Assert.assertEquals(position3.getLedgerId(), messageId2.getLedgerId());
Assert.assertEquals(position3.getEntryId(), messageId2.getEntryId() + 1);
//test publishing normal messages will change maxReadPosition if the state of TB
//is Ready and ongoingTxns is empty.
MessageIdImpl messageId4 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
PositionImpl position4 = topicTransactionBuffer.getMaxReadPosition();
Assert.assertEquals(position4.getLedgerId(), messageId4.getLedgerId());
Assert.assertEquals(position4.getEntryId(), messageId4.getEntryId());
//test publishing normal messages will not change maxReadPosition if the state o TB is Initializing.
Class<TopicTransactionBufferState> transactionBufferStateClass =
(Class<TopicTransactionBufferState>) topicTransactionBuffer.getClass().getSuperclass();
Field field = transactionBufferStateClass.getDeclaredField("state");
field.setAccessible(true);
Class<TopicTransactionBuffer> topicTransactionBufferClass = TopicTransactionBuffer.class;
Field maxReadPositionField = topicTransactionBufferClass.getDeclaredField("maxReadPosition");
maxReadPositionField.setAccessible(true);
field.set(topicTransactionBuffer, TopicTransactionBufferState.State.Initializing);
MessageIdImpl messageId5 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
PositionImpl position5 = (PositionImpl) maxReadPositionField.get(topicTransactionBuffer);
Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());
}
@Test
public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.producerName("test")
.enableBatching(false)
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topic)
.create();
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
producer.newMessage(txn).value("test").send();
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic("persistent://" + topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
doReturn("transaction-buffer-sub").when(managedCursor).getName();
doReturn(true).when(managedCursor).hasMoreEntries();
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
Class<ManagedLedgerImpl> managedLedgerClass = ManagedLedgerImpl.class;
Field field = managedLedgerClass.getDeclaredField("cursors");
field.setAccessible(true);
ManagedCursorContainer managedCursors = (ManagedCursorContainer) field.get(persistentTopic.getManagedLedger());
managedCursors.removeCursor("transaction-buffer-sub");
managedCursors.add(managedCursor);
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
TransactionBuffer buffer2 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer2.getStats(false).state, "Ready"));
managedCursors.removeCursor("transaction-buffer-sub");
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
managedCursors.add(managedCursor);
TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer3.getStats(false).state, "Ready"));
persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
assertTrue(internalStats.cursors.isEmpty());
});
managedCursors.removeCursor("transaction-buffer-sub");
}
@Test
public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
1, TimeUnit.MILLISECONDS);
String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.producerName("test")
.enableBatching(false)
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topic)
.create();
producer.newMessage().send();
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic
.createSubscription("test",
CommandSubscribe.InitialPosition.Earliest, false, null).get();
ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
doReturn(true).when(managedCursor).hasMoreEntries();
doReturn(false).when(managedCursor).isClosed();
doReturn(new PositionImpl(-1, -1)).when(managedCursor).getMarkDeletedPosition();
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
doReturn(CompletableFuture.completedFuture(
new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null,
500, bufferedWriterConfig, transactionTimer)))
.when(pendingAckStoreProvider).newPendingAckStore(any());
doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());
Class<PulsarService> pulsarServiceClass = PulsarService.class;
Field field = pulsarServiceClass.getDeclaredField("transactionPendingAckStoreProvider");
field.setAccessible(true);
field.set(getPulsarServiceList().get(0), pendingAckStoreProvider);
PendingAckHandleImpl pendingAckHandle1 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle1.getStats(false).state, "Ready"));
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle2.getStats(false).state, "Ready"));
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
PendingAckHandleImpl pendingAckHandle3 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle3.getStats(false).state, "Ready"));
// cleanup
transactionTimer.stop();
}
@Test
public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
1, TimeUnit.MILLISECONDS);
String topic = NAMESPACE1 + "/testEndTCRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
Map<String, String> map = new HashMap<>();
map.put(MLTransactionSequenceIdGenerator.MAX_LOCAL_TXN_ID, "1");
persistentTopic.getManagedLedger().setProperties(map);
ManagedCursor managedCursor = mock(ManagedCursor.class);
doReturn(true).when(managedCursor).hasMoreEntries();
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog =
new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
persistentTopic.getManagedLedger().getConfig(), new TxnLogBufferedWriterConfig(),
transactionTimer);
Class<MLTransactionLogImpl> mlTransactionLogClass = MLTransactionLogImpl.class;
Field field = mlTransactionLogClass.getDeclaredField("cursor");
field.setAccessible(true);
field.set(mlTransactionLog, managedCursor);
field = mlTransactionLogClass.getDeclaredField("managedLedger");
field.setAccessible(true);
field.set(mlTransactionLog, persistentTopic.getManagedLedger());
TransactionRecoverTracker transactionRecoverTracker = mock(TransactionRecoverTracker.class);
doNothing().when(transactionRecoverTracker).appendOpenTransactionToTimeoutTracker();
doNothing().when(transactionRecoverTracker).handleCommittingAndAbortingTransaction();
TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class);
doNothing().when(timeoutTracker).start();
MLTransactionMetadataStore metadataStore1 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
metadataStore1.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
MLTransactionMetadataStore metadataStore2 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
metadataStore2.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
MLTransactionMetadataStore metadataStore3 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
metadataStore3.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));
// cleanup.
transactionTimer.stop();
}
@Test
public void testEndTxnWhenCommittingOrAborting() throws Exception {
CounterBrokerInterceptor listener = (CounterBrokerInterceptor) getPulsarServiceList().get(0)
.getBrokerInterceptor();
listener.reset();
Transaction commitTxn = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
Transaction abortTxn = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
Class<TransactionImpl> transactionClass = TransactionImpl.class;
Field field = transactionClass.getDeclaredField("state");
field.setAccessible(true);
field.set(commitTxn, TransactionImpl.State.COMMITTING);
field.set(abortTxn, TransactionImpl.State.ABORTING);
assertEquals(((CounterBrokerInterceptor)listener).getTxnCount(),2);
abortTxn.abort().get();
assertEquals(((CounterBrokerInterceptor)listener).getAbortedTxnCount(),1);
commitTxn.commit().get();
assertEquals(((CounterBrokerInterceptor)listener).getCommittedTxnCount(),1);
}
@Test
public void testNoEntryCanBeReadWhenRecovery() throws Exception {
String topic = NAMESPACE1 + "/test";
PersistentTopic persistentTopic =
(PersistentTopic) pulsarServiceList.get(0).getBrokerService()
.getTopic(TopicName.get(topic).toString(), true)
.get()
.get();
Class<PersistentTopic> persistentTopicClass = PersistentTopic.class;
Field filed1 = persistentTopicClass.getDeclaredField("ledger");
Field field2 = persistentTopicClass.getDeclaredField("transactionBuffer");
filed1.setAccessible(true);
field2.setAccessible(true);
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) spy(filed1.get(persistentTopic));
filed1.set(persistentTopic, managedLedger);
TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) field2.get(persistentTopic);
Method method = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
method.setAccessible(true);
CompletableFuture<Void> completableFuture = (CompletableFuture<Void>) method.invoke(topicTransactionBuffer);
completableFuture.get();
doReturn(PositionImpl.LATEST).when(managedLedger).getLastConfirmedEntry();
ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
doReturn(false).when(managedCursor).hasMoreEntries();
doReturn(managedCursor).when(managedLedger).newNonDurableCursor(any(), any());
TopicTransactionBuffer transactionBuffer = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().untilAsserted(() -> Assert.assertTrue(transactionBuffer.checkIfReady()));
}
@Test
public void testRetryExceptionOfEndTxn() throws Exception{
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();
Class<TransactionMetadataStoreState> transactionMetadataStoreStateClass = TransactionMetadataStoreState.class;
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores()
.values()
.forEach((transactionMetadataStore -> {
try {
Field field = transactionMetadataStoreStateClass.getDeclaredField("state");
field.setAccessible(true);
field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Initializing);
} catch (Exception e) {
e.printStackTrace();
}
}));
CompletableFuture<Void> completableFuture = transaction.commit();
try {
completableFuture.get(5, TimeUnit.SECONDS);
fail();
} catch (TimeoutException ignored) {
}
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores()
.values()
.stream()
.forEach((transactionMetadataStore -> {
try {
Field field = transactionMetadataStoreStateClass.getDeclaredField("state");
field.setAccessible(true);
field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Ready);
} catch (Exception e) {
e.printStackTrace();
}
}));
completableFuture.get(5, TimeUnit.SECONDS);
}
@Test
public void testCancelTxnTimeout() throws Exception{
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();
transaction.commit().get();
Field field = TransactionImpl.class.getDeclaredField("timeout");
field.setAccessible(true);
Timeout timeout = (Timeout) field.get(transaction);
Assert.assertTrue(timeout.isCancelled());
transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();
transaction.abort().get();
timeout = (Timeout) field.get(transaction);
Assert.assertTrue(timeout.isCancelled());
}
@Test
public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService()
.getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
.get().get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
field.setAccessible(true);
AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) field.get(buffer);
Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
field1.setAccessible(true);
Awaitility.await().untilAsserted(() -> {
TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer);
Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot);
});
Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
}
@Test
public void testAutoCreateSchemaForTransactionSnapshot() throws Exception {
String namespace = TENANT + "/ns2";
String topic = namespace + "/test";
pulsarServiceList.forEach((pulsarService ->
pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(false)));
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);
TopicName transactionBufferTopicName =
NamespaceEventsSystemTopicFactory.getSystemTopicName(
TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
TopicName transactionBufferTopicName1 =
NamespaceEventsSystemTopicFactory.getSystemTopicName(
TopicName.get(topic).getNamespaceObject(), EventType.TOPIC_POLICY);
Awaitility.await().untilAsserted(() -> {
SchemaInfo schemaInfo = admin
.schemas()
.getSchemaInfo(transactionBufferTopicName.toString());
Assert.assertNotNull(schemaInfo);
SchemaInfo schemaInfo1 = admin
.schemas()
.getSchemaInfo(transactionBufferTopicName1.toString());
Assert.assertNotNull(schemaInfo1);
});
pulsarServiceList.forEach((pulsarService ->
pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
}
@Test
public void testPendingAckMarkDeletePosition() throws Exception {
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(1);
getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(5);
String topic = NAMESPACE1 + "/test1";
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer(Schema.BYTES)
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
consumer.getSubscription();
PersistentSubscription persistentSubscription = (PersistentSubscription) getPulsarServiceList()
.get(0)
.getBrokerService()
.getTopic(topic, false)
.get()
.get()
.getSubscription("sub");
ManagedCursor subscriptionCursor = persistentSubscription.getCursor();
subscriptionCursor.getMarkDeletedPosition();
//pendingAck add message1 and commit mark, metadata add message1
//PersistentMarkDeletedPosition have not updated
producer.newMessage()
.value("test".getBytes(UTF_8))
.send();
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get();
Message<byte[]> message1 = consumer.receive(10, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message1.getMessageId(), transaction);
transaction.commit().get();
//PersistentMarkDeletedPosition of subscription have updated to message1,
//check whether delete markDeletedPosition of pendingAck after append entry to pendingAck
transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get();
producer.newMessage()
.value("test".getBytes(UTF_8))
.send();
Message<byte[]> message2 = consumer.receive(10, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message2.getMessageId(), transaction);
Awaitility.await().untilAsserted(() -> {
ManagedLedgerInternalStats managedLedgerInternalStats = admin
.transactions()
.getPendingAckInternalStats(topic, "sub", false)
.pendingAckLogStats
.managedLedgerInternalStats;
String [] markDeletePosition = managedLedgerInternalStats.cursors.get("__pending_ack_state")
.markDeletePosition.split(":");
String [] lastConfirmedEntry = managedLedgerInternalStats.lastConfirmedEntry.split(":");
Assert.assertEquals(markDeletePosition[0], lastConfirmedEntry[0]);
//don`t contain commit mark and unCommitted message2
Assert.assertEquals(Integer.parseInt(markDeletePosition[1]),
Integer.parseInt(lastConfirmedEntry[1]) - 2);
});
}
@Test
public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
TransactionMetadataStore transactionMetadataStore = getPulsarServiceList().get(0)
.getTransactionMetadataStoreService()
.getStores()
.get(new TransactionCoordinatorID(0));
Field field = MLTransactionMetadataStore.class.getDeclaredField("transactionLog");
field.setAccessible(true);
MLTransactionLogImpl transactionLog = (MLTransactionLogImpl) field.get(transactionMetadataStore);
Field field1 = MLTransactionLogImpl.class.getDeclaredField("cursor");
field1.setAccessible(true);
ManagedCursorImpl managedCursor = (ManagedCursorImpl) field1.get(transactionLog);
managedCursor.close();
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
transaction.commit().get();
}
@Test(timeOut = 30000)
public void testTransactionAckMessageList() throws Exception {
String topic = "persistent://" + NAMESPACE1 +"/test";
String subName = "testSub";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(5, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
for (int i = 0; i < 5; i++) {
producer.newMessage().send();
}
//verify using aborted transaction to ack message list
List<MessageId> messages = new ArrayList<>();
for (int i = 0; i < 4; i++) {
Message<byte[]> message = consumer.receive();
messages.add(message.getMessageId());
}
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
consumer.acknowledgeAsync(messages, transaction);
transaction.abort().get();
consumer.close();
consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
for (int i = 0; i < 4; i++) {
Message<byte[]> message = consumer.receive();
assertTrue(messages.contains(message.getMessageId()));
}
//verify using committed transaction to ack message list
transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
consumer.acknowledgeAsync(messages, transaction);
transaction.commit().get();
consumer.close();
consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertFalse(messages.contains(message.getMessageId()));
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
consumer.close();
}
@Test(timeOut = 30000)
public void testTransactionAckMessages() throws Exception {
String topic = "persistent://" + NAMESPACE1 +"/testTransactionAckMessages";
String subName = "testSub";
admin.topics().createPartitionedTopic(topic, 2);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(5, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
for (int i = 0; i < 4; i++) {
producer.newMessage().send();
}
Method method = ConsumerBase.class.getDeclaredMethod("getNewMessagesImpl");
method.setAccessible(true);
Field field = MessagesImpl.class.getDeclaredField("messageList");
field.setAccessible(true);
MessagesImpl<byte[]> messages = (MessagesImpl<byte[]>) method.invoke(consumer);
List<Message<byte[]>> messageList = new ArrayList<>();
for (int i = 0; i < 4; i++) {
Message<byte[]> message = consumer.receive();
messageList.add(message);
}
field.set(messages, messageList);
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
consumer.acknowledgeAsync(messages, transaction);
transaction.abort().get();
consumer.close();
consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
List<MessageId> messageIds = new ArrayList<>();
for (Message message : messageList) {
messageIds.add(message.getMessageId());
}
for (int i = 0; i < 4; i++) {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
assertTrue(messageIds.contains(message.getMessageId()));
}
//verify using committed transaction to ack message list
transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
consumer.acknowledgeAsync(messages, transaction);
transaction.commit().get();
consumer.close();
consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
consumer.close();
}
@Test
public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception {
String topic = NAMESPACE1 + "/testGetConnectExceptionForAckMsgWhenCnxIsNull";
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer(Schema.BYTES)
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
for (int i = 0; i < 10; i++) {
producer.newMessage().value(Bytes.toBytes(i)).send();
}
ClientCnx cnx = Whitebox.invokeMethod(consumer, "cnx");
Whitebox.invokeMethod(consumer, "connectionClosed", cnx);
Message<byte[]> message = consumer.receive();
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
try {
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
fail();
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof PulsarClientException.ConnectException);
}
}
@Test
public void testPendingAckBatchMessageCommit() throws Exception {
String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit";
// enable batch index ack
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer(Schema.BYTES)
.topic(topic)
.enableBatching(true)
// ensure that batch message is sent
.batchingMaxPublishDelay(3, TimeUnit.SECONDS)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.subscriptionType(SubscriptionType.Shared)
.topic(topic)
.subscriptionName("sub")
.subscribe();
// send batch message, the size is 5
for (int i = 0; i < 5; i++) {
producer.sendAsync(("test" + i).getBytes());
}
producer.flush();
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
// ack the first message with transaction
consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn1).get();
Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
// ack the second message with transaction
MessageId messageId = consumer.receive().getMessageId();
consumer.acknowledgeAsync(messageId, txn2).get();
// commit the txn1
txn1.commit().get();
// abort the txn2
txn2.abort().get();
Transaction txn3 = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
// repeat ack the second message, can ack successful
consumer.acknowledgeAsync(messageId, txn3).get();
}
/**
* When change pending ack handle state failure, exceptionally complete cmd-subscribe.
* see: https://github.com/apache/pulsar/pull/16248.
*/
@Test
public void testPendingAckReplayChangeStateError() throws InterruptedException, TimeoutException {
AtomicInteger atomicInteger = new AtomicInteger(1);
// Create Executor
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
// Mock serviceConfiguration.
ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
// Mock executorProvider.
ExecutorProvider executorProvider = mock(ExecutorProvider.class);
when(executorProvider.getExecutor()).thenReturn(executorService);
when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService);
// Mock pendingAckStore.
PendingAckStore pendingAckStore = mock(PendingAckStore.class);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
executorService.execute(()->{
PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0];
pendingAckHandle.close();
MLPendingAckReplyCallBack mlPendingAckReplyCallBack
= new MLPendingAckReplyCallBack(pendingAckHandle);
mlPendingAckReplyCallBack.replayComplete();
});
return null;
}
}).when(pendingAckStore).replayAsync(any(), any());
// Mock executorProvider.
TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
when(pendingAckStoreProvider.checkInitializedBefore(any()))
.thenReturn(CompletableFuture.completedFuture(true));
when(pendingAckStoreProvider.newPendingAckStore(any()))
.thenReturn(CompletableFuture.completedFuture(pendingAckStore));
// Mock pulsar.
PulsarService pulsar = mock(PulsarService.class);
when(pulsar.getConfig()).thenReturn(serviceConfiguration);
when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
when(pulsar.getTransactionPendingAckStoreProvider()).thenReturn(pendingAckStoreProvider);
// Mock brokerService.
BrokerService brokerService = mock(BrokerService.class);
when(brokerService.getPulsar()).thenReturn(pulsar);
when(brokerService.pulsar()).thenReturn(pulsar);
// Mock topic.
PersistentTopic topic = mock(PersistentTopic.class);
when(topic.getBrokerService()).thenReturn(brokerService);
when(topic.getName()).thenReturn("topic-a");
// Mock cursor for subscription.
ManagedCursor cursor_subscription = mock(ManagedCursor.class);
doThrow(new RuntimeException("1")).when(cursor_subscription).updateLastActive();
// Create subscription.
String subscriptionName = "sub-a";
boolean replicated = false;
Map<String, String> subscriptionProperties = Collections.emptyMap();
PersistentSubscription persistentSubscription = new PersistentSubscription(topic, subscriptionName,
cursor_subscription, replicated, subscriptionProperties);
org.apache.pulsar.broker.service.Consumer consumer = mock(org.apache.pulsar.broker.service.Consumer.class);
try {
CompletableFuture<Void> addConsumerFuture = persistentSubscription.addConsumer(consumer);
addConsumerFuture.get(5, TimeUnit.SECONDS);
fail("Expect failure by PendingAckHandle closed, but success");
} catch (ExecutionException executionException){
Throwable t = executionException.getCause();
Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
}
}
/**
* When change TB state failure, exceptionally complete cmd-producer.
* see: https://github.com/apache/pulsar/pull/16248.
*/
@Test
public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException {
final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<PersistentTopic>();
AtomicInteger atomicInteger = new AtomicInteger(1);
// Create Executor
ScheduledExecutorService executorService_recover = mock(ScheduledExecutorService.class);
// Mock serviceConfiguration.
ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false);
when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
// Mock executorProvider.
ExecutorProvider executorProvider = mock(ExecutorProvider.class);
when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService_recover);
// Mock pendingAckStore.
PendingAckStore pendingAckStore = mock(PendingAckStore.class);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
new Thread(() -> {
TopicTransactionBuffer.TopicTransactionBufferRecover recover
= (TopicTransactionBuffer.TopicTransactionBufferRecover)invocation.getArguments()[0];
TopicTransactionBufferRecoverCallBack callBack
= Whitebox.getInternalState(recover, "callBack");;
try {
persistentTopic.get().getTransactionBuffer().closeAsync().get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
callBack.recoverComplete();
}).start();
return null;
}
}).when(executorService_recover).execute(any());
// Mock executorProvider.
TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
when(pendingAckStoreProvider.checkInitializedBefore(any()))
.thenReturn(CompletableFuture.completedFuture(true));
when(pendingAckStoreProvider.newPendingAckStore(any()))
.thenReturn(CompletableFuture.completedFuture(pendingAckStore));
// Mock TransactionBufferSnapshotService
TransactionBufferSnapshotService transactionBufferSnapshotService
= mock(TransactionBufferSnapshotService.class);
SystemTopicClient.Writer writer = mock(SystemTopicClient.Writer.class);
when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
when(transactionBufferSnapshotService.createWriter(any()))
.thenReturn(CompletableFuture.completedFuture(writer));
// Mock pulsar.
PulsarService pulsar = mock(PulsarService.class);
when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
when(pulsar.getConfig()).thenReturn(serviceConfiguration);
when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
when(pulsar.getTransactionBufferSnapshotService()).thenReturn(transactionBufferSnapshotService);
TopicTransactionBufferProvider topicTransactionBufferProvider = new TopicTransactionBufferProvider();
when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider);
// Mock BacklogQuotaManager
BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class);
// Mock brokerService.
BrokerService brokerService = mock(BrokerService.class);
when(brokerService.getPulsar()).thenReturn(pulsar);
when(brokerService.pulsar()).thenReturn(pulsar);
when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
// Mock managedLedger.
ManagedLedgerImpl managedLedger = mock(ManagedLedgerImpl.class);
ManagedCursorContainer managedCursors = new ManagedCursorContainer();
when(managedLedger.getCursors()).thenReturn(managedCursors);
PositionImpl position = PositionImpl.EARLIEST;
when(managedLedger.getLastConfirmedEntry()).thenReturn(position);
// Create topic.
persistentTopic.set(new PersistentTopic("topic-a", managedLedger, brokerService));
try {
// Do check.
persistentTopic.get().checkIfTransactionBufferRecoverCompletely(true).get(5, TimeUnit.SECONDS);
fail("Expect failure by TB closed, but it is finished.");
} catch (ExecutionException executionException){
Throwable t = executionException.getCause();
Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
}
}
}