blob: af67b06587274a2eeec7cff09c5c6f5d296f4789 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
* End to end transaction test.
*/
@Slf4j
@Test(groups = "flaky")
public class TransactionEndToEndTest extends TransactionTestBase {
private static final int TOPIC_PARTITION = 3;
private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
}
@AfterMethod(alwaysRun = true)
protected void cleanup() {
super.internalCleanup();
}
@Test
public void noBatchProduceCommitTest() throws Exception {
produceCommitTest(false);
}
@Test
public void batchProduceCommitTest() throws Exception {
produceCommitTest(true);
}
private void produceCommitTest(boolean enableBatch) throws Exception {
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(TOPIC_OUTPUT)
.subscriptionName("test")
.enableBatchIndexAcknowledgment(true)
.subscribe();
Awaitility.await().until(consumer::isConnected);
ProducerBuilder<byte[]> producerBuilder = pulsarClient
.newProducer()
.topic(TOPIC_OUTPUT)
.enableBatching(enableBatch)
.sendTimeout(0, TimeUnit.SECONDS);
@Cleanup
Producer<byte[]> producer = producerBuilder.create();
Transaction txn1 = getTxn();
Transaction txn2 = getTxn();
int txnMessageCnt = 0;
int messageCnt = 1000;
for (int i = 0; i < messageCnt; i++) {
if (i % 5 == 0) {
producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
} else {
producer.newMessage(txn2).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
}
txnMessageCnt++;
}
// Can't receive transaction messages before commit.
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
txn1.commit().get();
txn2.commit().get();
int receiveCnt = 0;
for (int i = 0; i < txnMessageCnt; i++) {
message = consumer.receive();
Assert.assertNotNull(message);
receiveCnt ++;
}
Assert.assertEquals(txnMessageCnt, receiveCnt);
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
log.info("message commit test enableBatch {}", enableBatch);
}
@Test
public void produceAbortTest() throws Exception {
Transaction txn = getTxn();
String subName = "test";
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(TOPIC_OUTPUT)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();
int messageCnt = 10;
for (int i = 0; i < messageCnt; i++) {
producer.newMessage(txn).value(("Hello Txn - " + i).getBytes(UTF_8)).send();
}
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(TOPIC_OUTPUT)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName(subName)
.enableBatchIndexAcknowledgment(true)
.subscribe();
Awaitility.await().until(consumer::isConnected);
// Can't receive transaction messages before abort.
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
txn.abort().get();
// Cant't receive transaction messages after abort.
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
Awaitility.await().until(() -> {
boolean flag = true;
for (int partition = 0; partition < TOPIC_PARTITION; partition ++) {
String topic;
topic = TopicName.get(TOPIC_OUTPUT).getPartition(partition).toString();
boolean exist = false;
for (int i = 0; i < getPulsarServiceList().size(); i++) {
Field field = BrokerService.class.getDeclaredField("topics");
field.setAccessible(true);
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
(ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) field
.get(getPulsarServiceList().get(i).getBrokerService());
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
if (topicFuture != null) {
Optional<Topic> topicOptional = topicFuture.get();
if (topicOptional.isPresent()) {
PersistentSubscription persistentSubscription =
(PersistentSubscription) topicOptional.get().getSubscription(subName);
Position markDeletePosition = persistentSubscription.getCursor().getMarkDeletedPosition();
Position lastConfirmedEntry = persistentSubscription.getCursor()
.getManagedLedger().getLastConfirmedEntry();
exist = true;
if (!markDeletePosition.equals(lastConfirmedEntry)) {
//this because of the transaction commit marker have't delete
//delete commit marker after ack position
//when delete commit marker operation is processing, next delete operation will not do again
//when delete commit marker operation finish, it can run next delete commit marker operation
//so this test may not delete all the position in this manageLedger.
Position markerPosition = ((ManagedLedgerImpl) persistentSubscription.getCursor()
.getManagedLedger()).getNextValidPosition((PositionImpl) markDeletePosition);
//marker is the lastConfirmedEntry, after commit the marker will only be write in
if (!markerPosition.equals(lastConfirmedEntry)) {
log.error("Mark delete position is not commit marker position!");
flag = false;
}
}
}
}
}
assertTrue(exist);
}
return flag;
});
log.info("finished test partitionAbortTest");
}
@Test
public void txnIndividualAckTestNoBatchAndSharedSub() throws Exception {
txnAckTest(false, 1, SubscriptionType.Shared);
}
@Test
public void txnIndividualAckTestBatchAndSharedSub() throws Exception {
txnAckTest(true, 200, SubscriptionType.Shared);
}
@Test
public void txnIndividualAckTestNoBatchAndFailoverSub() throws Exception {
txnAckTest(false, 1, SubscriptionType.Failover);
}
@Test
public void txnIndividualAckTestBatchAndFailoverSub() throws Exception {
txnAckTest(true, 200, SubscriptionType.Failover);
}
private void txnAckTest(boolean batchEnable, int maxBatchSize,
SubscriptionType subscriptionType) throws Exception {
String normalTopic = NAMESPACE1 + "/normal-topic";
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(normalTopic)
.subscriptionName("test")
.enableBatchIndexAcknowledgment(true)
.subscriptionType(subscriptionType)
.subscribe();
Awaitility.await().until(consumer::isConnected);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(normalTopic)
.enableBatching(batchEnable)
.batchingMaxMessages(maxBatchSize)
.create();
for (int retryCnt = 0; retryCnt < 2; retryCnt++) {
Transaction txn = getTxn();
int messageCnt = 1000;
// produce normal messages
for (int i = 0; i < messageCnt; i++){
producer.newMessage().value("hello".getBytes()).sendAsync();
}
// consume and ack messages with txn
for (int i = 0; i < messageCnt; i++) {
Message<byte[]> message = consumer.receive();
Assert.assertNotNull(message);
log.info("receive msgId: {}, count : {}", message.getMessageId(), i);
consumer.acknowledgeAsync(message.getMessageId(), txn).get();
}
// the messages are pending ack state and can't be received
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
// 1) txn abort
txn.abort().get();
// after transaction abort, the messages could be received
Transaction commitTxn = getTxn();
for (int i = 0; i < messageCnt; i++) {
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
consumer.acknowledgeAsync(message.getMessageId(), commitTxn).get();
log.info("receive msgId: {}, count: {}", message.getMessageId(), i);
}
// 2) ack committed by a new txn
commitTxn.commit().get();
// after transaction commit, the messages can't be received
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(commitTxn, TransactionImpl.State.OPEN);
try {
commitTxn.commit().get();
fail("recommit one transaction should be failed.");
} catch (Exception reCommitError) {
// recommit one transaction should be failed
log.info("expected exception for recommit one transaction.");
Assert.assertNotNull(reCommitError);
Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
}
}
}
@Test
public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
String topicOne = "persistent://" + NAMESPACE1 + "/topic-one";
String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two";
String sub = "test";
admin.topics().createNonPartitionedTopic(topicOne);
admin.topics().createSubscription(topicOne, "test", MessageId.earliest);
admin.topics().delete(topicOne);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicTwo).subscriptionName(sub).subscribe();
String content = "test";
producer.send(content);
assertEquals(consumer.receive().getValue(), content);
}
@Test
public void txnMessageAckTest() throws Exception {
String topic = TOPIC_MESSAGE_ACK_TEST;
final String subName = "test";
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName(subName)
.enableBatchIndexAcknowledgment(true)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
.subscribe();
Awaitility.await().until(consumer::isConnected);
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();
Transaction txn = getTxn();
int messageCnt = 10;
for (int i = 0; i < messageCnt; i++) {
producer.newMessage(txn).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
}
log.info("produce transaction messages finished");
// Can't receive transaction messages before commit.
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
log.info("transaction messages can't be received before transaction committed");
txn.commit().get();
int ackedMessageCount = 0;
int receiveCnt = 0;
for (int i = 0; i < messageCnt; i++) {
message = consumer.receive();
Assert.assertNotNull(message);
receiveCnt ++;
if (i % 2 == 0) {
consumer.acknowledge(message);
ackedMessageCount ++;
}
}
Assert.assertEquals(messageCnt, receiveCnt);
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
String checkTopic = TopicName.get(topic).getPartition(0).toString();
PersistentTopicInternalStats stats = admin.topics().getInternalStats(checkTopic, false);
Assert.assertNotEquals(stats.cursors.get(subName).markDeletePosition, stats.lastConfirmedEntry);
consumer.redeliverUnacknowledgedMessages();
receiveCnt = 0;
for (int i = 0; i < messageCnt - ackedMessageCount; i++) {
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
consumer.acknowledge(message);
receiveCnt ++;
}
Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt);
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
topic = TopicName.get(topic).getPartition(0).toString();
boolean exist = false;
for (int i = 0; i < getPulsarServiceList().size(); i++) {
Field field = BrokerService.class.getDeclaredField("topics");
field.setAccessible(true);
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
(ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) field
.get(getPulsarServiceList().get(i).getBrokerService());
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
if (topicFuture != null) {
Optional<Topic> topicOptional = topicFuture.get();
if (topicOptional.isPresent()) {
PersistentSubscription persistentSubscription =
(PersistentSubscription) topicOptional.get().getSubscription(subName);
Position markDeletePosition = persistentSubscription.getCursor().getMarkDeletedPosition();
Position lastConfirmedEntry = persistentSubscription.getCursor()
.getManagedLedger().getLastConfirmedEntry();
exist = true;
if (!markDeletePosition.equals(lastConfirmedEntry)) {
//this because of the transaction commit marker have't delete
//delete commit marker after ack position
//when delete commit marker operation is processing, next delete operation will not do again
//when delete commit marker operation finish, it can run next delete commit marker operation
//so this test may not delete all the position in this manageLedger.
Position markerPosition = ((ManagedLedgerImpl) persistentSubscription.getCursor()
.getManagedLedger()).getNextValidPosition((PositionImpl) markDeletePosition);
//marker is the lastConfirmedEntry, after commit the marker will only be write in
if (!markerPosition.equals(lastConfirmedEntry)) {
log.error("Mark delete position is not commit marker position!");
fail();
}
}
}
}
}
assertTrue(exist);
log.info("receive transaction messages count: {}", receiveCnt);
}
@Test
public void txnAckTestBatchAndCumulativeSub() throws Exception {
txnCumulativeAckTest(true, 200, SubscriptionType.Failover);
}
@Test
public void txnAckTestNoBatchAndCumulativeSub() throws Exception {
txnCumulativeAckTest(false, 1, SubscriptionType.Failover);
}
private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, SubscriptionType subscriptionType)
throws Exception {
String normalTopic = NAMESPACE1 + "/normal-topic";
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(normalTopic)
.subscriptionName("test")
.enableBatchIndexAcknowledgment(true)
.subscriptionType(subscriptionType)
.ackTimeout(1, TimeUnit.MINUTES)
.subscribe();
Awaitility.await().until(consumer::isConnected);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(normalTopic)
.enableBatching(batchEnable)
.batchingMaxMessages(maxBatchSize)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.create();
for (int retryCnt = 0; retryCnt < 2; retryCnt++) {
Transaction abortTxn = getTxn();
int messageCnt = 1000;
// produce normal messages
for (int i = 0; i < messageCnt; i++){
producer.newMessage().value("hello".getBytes()).sendAsync();
}
Message<byte[]> message = null;
Thread.sleep(1000L);
for (int i = 0; i < messageCnt; i++) {
message = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(message);
if (i % 3 == 0) {
consumer.acknowledgeCumulativeAsync(message.getMessageId(), abortTxn).get();
}
log.info("receive msgId abort: {}, retryCount : {}, count : {}", message.getMessageId(), retryCnt, i);
}
try {
consumer.acknowledgeCumulativeAsync(message.getMessageId(), abortTxn).get();
fail("not ack conflict ");
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
}
try {
consumer.acknowledgeCumulativeAsync(DefaultImplementation.getDefaultImplementation()
.newMessageId(((MessageIdImpl) message.getMessageId()).getLedgerId(),
((MessageIdImpl) message.getMessageId()).getEntryId() - 1, -1),
abortTxn).get();
fail("not ack conflict ");
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
}
// the messages are pending ack state and can't be received
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
abortTxn.abort().get();
Transaction commitTxn = getTxn();
for (int i = 0; i < messageCnt; i++) {
message = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(message);
if (i % 3 == 0) {
consumer.acknowledgeCumulativeAsync(message.getMessageId(), commitTxn).get();
}
log.info("receive msgId abort: {}, retryCount : {}, count : {}", message.getMessageId(), retryCnt, i);
}
commitTxn.commit().get();
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(commitTxn, TransactionImpl.State.OPEN);
try {
commitTxn.commit().get();
fail("recommit one transaction should be failed.");
} catch (Exception reCommitError) {
// recommit one transaction should be failed
log.info("expected exception for recommit one transaction.");
Assert.assertNotNull(reCommitError);
Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
}
message = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNull(message);
}
}
private Transaction getTxn() throws Exception {
return pulsarClient
.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();
}
private void markDeletePositionCheck(String topic, String subName, boolean equalsWithLastConfirm) throws Exception {
for (int i = 0; i < TOPIC_PARTITION; i++) {
PersistentTopicInternalStats stats = null;
String checkTopic = TopicName.get(topic).getPartition(i).toString();
for (int j = 0; j < 10; j++) {
stats = admin.topics().getInternalStats(checkTopic, false);
if (stats.lastConfirmedEntry.equals(stats.cursors.get(subName).markDeletePosition)) {
break;
}
Thread.sleep(200);
}
if (equalsWithLastConfirm) {
Assert.assertEquals(stats.cursors.get(subName).markDeletePosition, stats.lastConfirmedEntry);
} else {
Assert.assertNotEquals(stats.cursors.get(subName).markDeletePosition, stats.lastConfirmedEntry);
}
}
}
@Test
public void txnMetadataHandlerRecoverTest() throws Exception {
String topic = NAMESPACE1 + "/tc-metadata-handler-recover";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
List<TxnID> txnIDList = new ArrayList<>();
int txnCnt = 20;
int messageCnt = 10;
for (int i = 0; i < txnCnt; i++) {
TransactionImpl txn = (TransactionImpl) pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get();
for (int j = 0; j < messageCnt; j++) {
producer.newMessage(txn).value("Hello".getBytes()).sendAsync().get();
}
txnIDList.add(new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
}
@Cleanup
PulsarClientImpl recoverPulsarClient = (PulsarClientImpl) PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
TransactionCoordinatorClient tcClient = recoverPulsarClient.getTcClient();
for (TxnID txnID : txnIDList) {
tcClient.commit(txnID);
}
@Cleanup
Consumer<byte[]> consumer = recoverPulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Awaitility.await().until(consumer::isConnected);
for (int i = 0; i < txnCnt * messageCnt; i++) {
Message<byte[]> message = consumer.receive();
Assert.assertNotNull(message);
}
}
@Test
public void produceTxnMessageOrderTest() throws Exception {
String topic = NAMESPACE1 + "/txn-produce-order";
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();
Awaitility.await().until(consumer::isConnected);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.producerName("txn-publish-order")
.create();
for (int ti = 0; ti < 10; ti++) {
Transaction txn = pulsarClient
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
.build().get();
for (int i = 0; i < 1000; i++) {
producer.newMessage(txn).value(("" + i).getBytes()).sendAsync();
}
txn.commit().get();
for (int i = 0; i < 1000; i++) {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertEquals(Integer.valueOf(new String(message.getData())), new Integer(i));
}
}
}
@Test
public void produceAndConsumeCloseStateTxnTest() throws Exception {
String topic = NAMESPACE1 + "/txn-close-state";
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.producerName("txn-close-state")
.create();
Transaction produceTxn = pulsarClient
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
.build().get();
Transaction consumeTxn = pulsarClient
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
.build().get();
producer.newMessage(produceTxn).value(("Hello Pulsar!").getBytes()).sendAsync().get();
produceTxn.commit().get();
try {
producer.newMessage(produceTxn).value(("Hello Pulsar!").getBytes()).sendAsync().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
}
try {
produceTxn.commit().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
}
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();
consumeTxn.commit().get();
try {
consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
}
try {
consumeTxn.commit().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
}
Transaction timeoutTxn = pulsarClient
.newTransaction()
.withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();
AtomicReference<TransactionMetadataStore> transactionMetadataStore = new AtomicReference<>();
getPulsarServiceList().forEach(pulsarService -> {
if (pulsarService.getTransactionMetadataStoreService().getStores()
.containsKey(TransactionCoordinatorID.get(((TransactionImpl) timeoutTxn).getTxnIdMostBits()))) {
transactionMetadataStore.set(pulsarService.getTransactionMetadataStoreService().getStores()
.get(TransactionCoordinatorID.get(((TransactionImpl) timeoutTxn).getTxnIdMostBits())));
}
});
Awaitility.await().until(() -> {
try {
transactionMetadataStore.get().getTxnMeta(new TxnID(((TransactionImpl) timeoutTxn)
.getTxnIdMostBits(), ((TransactionImpl) timeoutTxn).getTxnIdLeastBits())).get();
return false;
} catch (Exception e) {
return true;
}
});
Class<TransactionImpl> transactionClass = TransactionImpl.class;
Constructor<TransactionImpl> constructor = transactionClass
.getDeclaredConstructor(PulsarClientImpl.class, long.class, long.class, long.class);
constructor.setAccessible(true);
TransactionImpl timeoutTxnSkipClientTimeout = constructor.newInstance(pulsarClient, 5,
timeoutTxn.getTxnID().getLeastSigBits(), timeoutTxn.getTxnID().getMostSigBits());
try {
timeoutTxnSkipClientTimeout.commit().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionNotFoundException);
}
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxnSkipClientTimeout);
assertEquals(state, TransactionImpl.State.ERROR);
}
@Test
public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService()
.newTransaction(new TransactionCoordinatorID(0), 1).get();
Awaitility.await().until(() -> {
try {
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
return false;
} catch (Exception e) {
return true;
}
});
Collection<TransactionMetadataStore> transactionMetadataStores =
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
long timeoutCount = transactionMetadataStores.stream()
.mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
Assert.assertEquals(timeoutCount, 1);
}
@Test
public void transactionTimeoutTest() throws Exception {
String topic = NAMESPACE1 + "/txn-timeout";
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.producerName("txn-timeout")
.create();
producer.send("Hello Pulsar!");
Transaction consumeTimeoutTxn = pulsarClient
.newTransaction()
.withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
Message<String> message = consumer.receive();
consumer.acknowledgeAsync(message.getMessageId(), consumeTimeoutTxn).get();
Message<String> reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS);
assertNull(reReceiveMessage);
reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS);
assertEquals(reReceiveMessage.getValue(), message.getValue());
assertEquals(reReceiveMessage.getMessageId(), message.getMessageId());
}
@DataProvider(name = "ackType")
public static Object[] ackType() {
return new Object[] {CommandAck.AckType.Cumulative, CommandAck.AckType.Individual};
}
@Test(dataProvider = "ackType")
public void txnTransactionRedeliverNullDispatcher(CommandAck.AckType ackType) throws Exception {
String topic = NAMESPACE1 + "/txnTransactionRedeliverNullDispatcher";
final String subName = "test";
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName(subName)
.enableBatchIndexAcknowledgment(true)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
.subscribe();
Awaitility.await().until(consumer::isConnected);
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();
int messageCnt = 10;
for (int i = 0; i < messageCnt; i++) {
producer.send(("Hello Txn - " + i).getBytes(UTF_8));
}
Transaction txn = getTxn();
if (ackType == CommandAck.AckType.Individual) {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn);
} else {
consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), txn);
}
topic = TopicName.get(topic).toString();
boolean exist = false;
for (int i = 0; i < getPulsarServiceList().size(); i++) {
Field field = BrokerService.class.getDeclaredField("topics");
field.setAccessible(true);
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
(ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) field
.get(getPulsarServiceList().get(i).getBrokerService());
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
if (topicFuture != null) {
Optional<Topic> topicOptional = topicFuture.get();
if (topicOptional.isPresent()) {
PersistentSubscription persistentSubscription =
(PersistentSubscription) topicOptional.get().getSubscription(subName);
field = persistentSubscription.getClass().getDeclaredField("dispatcher");
field.setAccessible(true);
field.set(persistentSubscription, null);
exist = true;
}
}
}
txn.abort().get();
assertTrue(exist);
}
@Test
public void oneTransactionOneTopicWithMultiSubTest() throws Exception {
String topic = NAMESPACE1 + "/oneTransactionOneTopicWithMultiSubTest";
final String subName1 = "test1";
final String subName2 = "test2";
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName(subName1)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
.subscribe();
Awaitility.await().until(consumer1::isConnected);
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName(subName2)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
.subscribe();
Awaitility.await().until(consumer2::isConnected);
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();
MessageId messageId = producer.send(("Hello Pulsar").getBytes(UTF_8));
TransactionImpl txn = (TransactionImpl) getTxn();
consumer1.acknowledgeAsync(messageId, txn).get();
consumer2.acknowledgeAsync(messageId, txn).get();
boolean flag = false;
for (int i = 0; i < getPulsarServiceList().size(); i++) {
TransactionMetadataStoreService transactionMetadataStoreService =
getPulsarServiceList().get(i).getTransactionMetadataStoreService();
if (transactionMetadataStoreService.getStores()
.containsKey(TransactionCoordinatorID.get(txn.getTxnIdMostBits()))) {
List<TransactionSubscription> list = transactionMetadataStoreService
.getTxnMeta(new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())).get().ackedPartitions();
flag = true;
assertEquals(list.size(), 2);
if (list.get(0).getSubscription().equals(subName1)) {
assertEquals(list.get(1).getSubscription(), subName2);
} else {
assertEquals(list.get(0).getSubscription(), subName2);
assertEquals(list.get(1).getSubscription(), subName1);
}
}
}
assertTrue(flag);
}
@Test
public void testTxnTimeOutInClient() throws Exception{
String topic = NAMESPACE1 + "/testTxnTimeOutInClient";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
.topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).consumerName("testTxnTimeOut_consumer")
.topic(topic).subscriptionName("testTxnTimeOut_sub").subscribe();
Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();
producer.newMessage().send();
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
});
try {
producer.newMessage(transaction).send();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause().getCause() instanceof TransactionCoordinatorClientException
.InvalidTxnStatusException);
}
try {
Message<String> message = consumer.receive();
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException
.InvalidTxnStatusException);
}
}
}