/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.transaction;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.Unpooled;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.Bytes;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessagesImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
 * Pulsar client transaction test.
 */
@Slf4j
@Test(groups = "broker")
public class TransactionTest extends TransactionTestBase {

    private static final int NUM_BROKERS = 1;
    private static final int NUM_PARTITIONS = 1;

    @BeforeClass
    protected void setup() throws Exception {
       setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
    }

    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }


    @Test
    public void testTopicTransactionMetrics() throws Exception {
        final String topic = "persistent://tnx/ns1/test_transaction_topic";

        @Cleanup
        Producer<byte[]> producer = this.pulsarClient.newProducer()
                .topic(topic)
                .sendTimeout(0, TimeUnit.SECONDS)
                .create();

        Transaction txn = pulsarClient.newTransaction()
                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
        producer.newMessage(txn).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
                .send();
        txn.commit().get();

        Transaction txn1 = pulsarClient.newTransaction()
                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
        producer.newMessage(txn1).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
                .send();
        txn1.abort().get();

        Transaction txn2 = pulsarClient.newTransaction()
                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
        producer.newMessage(txn2).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
                .send();

        PulsarService pulsarService = pulsarServiceList.get(0);
        Optional<Topic> optional = pulsarService.getBrokerService().getTopic(topic, false).get();
        assertTrue(optional.isPresent());
        PersistentTopic persistentTopic = (PersistentTopic) optional.get();
        TopicStatsImpl stats = persistentTopic.getStats(false, false, false);

        assertEquals(stats.committedTxnCount, 1);
        assertEquals(stats.abortedTxnCount, 1);
        assertEquals(stats.ongoingTxnCount, 1);
    }

    @Test
    public void testCreateTransactionSystemTopic() throws Exception {
        String subName = "test";
        String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();

        try {
            // init pending ack
            @Cleanup
            Consumer<byte[]> consumer = getConsumer(topicName, subName);
            Transaction transaction = pulsarClient.newTransaction()
                    .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

            consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
        } catch (ExecutionException e) {
            assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
        }
        topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);

        // getList does not include transaction system topic
        List<String> list = admin.topics().getList(NAMESPACE1);
        assertEquals(list.size(), 2);
        list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));

        try {
            // can't create transaction system topic
            @Cleanup
            Consumer<byte[]> consumer = getConsumer(topicName, subName);
            fail();
        } catch (PulsarClientException.NotAllowedException e) {
            assertTrue(e.getMessage().contains("Can not create transaction system topic"));
        }

        // can't create transaction system topic
        try {
            admin.topics().getSubscriptions(topicName);
            fail();
        } catch (PulsarAdminException e) {
            assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
        }

        // can't create transaction system topic
        try {
            admin.topics().createPartitionedTopic(topicName, 3);
            fail();
        } catch (PulsarAdminException e) {
            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
        }

        // can't create transaction system topic
        try {
            admin.topics().createNonPartitionedTopic(topicName);
            fail();
        } catch (PulsarAdminException e) {
            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
        }
    }

    @Test
    public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
        String subName = "test";

        String topicName = TopicName.get(NAMESPACE1 + "/test").toString();


        @Cleanup
        Consumer<byte[]> consumer = getConsumer(topicName, subName);

        consumer.close();

        Awaitility.await().until(() -> {
            try {
                pulsarClient.newTransaction()
                        .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
            } catch (Exception e) {
                return false;
            }
            return true;
        });

        admin.namespaces().unload(NamespaceName.SYSTEM_NAMESPACE.toString());
        admin.namespaces().unload(NAMESPACE1);

        @Cleanup
        Consumer<byte[]> consumer1 = getConsumer(topicName, subName);

        Awaitility.await().until(() -> {
            try {
                pulsarClient.newTransaction()
                        .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
            } catch (Exception e) {
                return false;
            }
            return true;
        });

        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
                getPulsarServiceList().get(0).getBrokerService().getTopics();

        Assert.assertNull(topics.get(TopicName.get(TopicDomain.persistent.value(),
                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString() + 0));
        Assert.assertNull(topics.get(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
        Assert.assertNull(topics.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName)));
    }

    public Consumer<byte[]> getConsumer(String topicName, String subName) throws PulsarClientException {
        return pulsarClient.newConsumer()
                .topic(topicName)
                .subscriptionName(subName)
                .subscriptionType(SubscriptionType.Shared)
                .enableBatchIndexAcknowledgment(true)
                .subscribe();
    }

    @Test
    public void testGetTxnID() throws Exception {
        Transaction transaction = pulsarClient.newTransaction()
                .build().get();
        TxnID txnID = transaction.getTxnID();
        Assert.assertEquals(txnID.getLeastSigBits(), 0);
        Assert.assertEquals(txnID.getMostSigBits(), 0);
        transaction.abort();
        transaction = pulsarClient.newTransaction()
                .build().get();
        txnID = transaction.getTxnID();
        Assert.assertEquals(txnID.getLeastSigBits(), 1);
        Assert.assertEquals(txnID.getMostSigBits(), 0);
    }

    @Test
    public void testSubscriptionRecreateTopic()
            throws PulsarAdminException, NoSuchFieldException, IllegalAccessException, PulsarClientException {
        String topic = "persistent://pulsar/system/testReCreateTopic";
        String subName = "sub_testReCreateTopic";
        int retentionSizeInMbSetTo = 5;
        int retentionSizeInMbSetTopic = 6;
        int retentionSizeInMinutesSetTo = 5;
        int retentionSizeInMinutesSetTopic = 6;
        admin.topics().createNonPartitionedTopic(topic);
        PulsarService pulsarService = super.getPulsarServiceList().get(0);
        pulsarService.getBrokerService().getTopics().clear();
        ManagedLedgerFactory managedLedgerFactory = pulsarService.getBrokerService().getManagedLedgerFactory();
        Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        field.setAccessible(true);
        ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers =
                (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) field.get(managedLedgerFactory);
        ledgers.remove(TopicName.get(topic).getPersistenceNamingEncoding());
        try {
            admin.topics().createNonPartitionedTopic(topic);
            Assert.fail();
        } catch (PulsarAdminException.ConflictException e) {
            log.info("Cann`t create topic again");
        }
        admin.topics().setRetention(topic,
                new RetentionPolicies(retentionSizeInMinutesSetTopic, retentionSizeInMbSetTopic));
        pulsarClient.newConsumer().topic(topic)
                .subscriptionName(subName)
                .subscribe();
        pulsarService.getBrokerService().getTopicIfExists(topic).thenAccept(option -> {
            if (!option.isPresent()) {
                log.error("Failed o get Topic named: {}", topic);
                Assert.fail();
            }
            PersistentTopic originPersistentTopic = (PersistentTopic) option.get();
            String pendingAckTopicName = MLPendingAckStore
                    .getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subName);

            try {
                admin.topics().setRetention(pendingAckTopicName,
                        new RetentionPolicies(retentionSizeInMinutesSetTo, retentionSizeInMbSetTo));
            } catch (PulsarAdminException e) {
                log.error("Failed to get./setRetention of topic with Exception:" + e);
                Assert.fail();
            }
            PersistentSubscription subscription = originPersistentTopic
                    .getSubscription(subName);
            subscription.getPendingAckManageLedger().thenAccept(managedLedger -> {
                long retentionSize = managedLedger.getConfig().getRetentionSizeInMB();
                if (!originPersistentTopic.getTopicPolicies().isPresent()) {
                    log.error("Failed to getTopicPolicies of :" + originPersistentTopic);
                    Assert.fail();
                }
                TopicPolicies topicPolicies = originPersistentTopic.getTopicPolicies().get();
                Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize);
                MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider();
                CompletableFuture<PendingAckStore> future = mlPendingAckStoreProvider.newPendingAckStore(subscription);
                future.thenAccept(pendingAckStore -> {
                            ((MLPendingAckStore) pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> {
                                Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
                                        retentionSizeInMbSetTo);
                            });
                        }
                );
            });


        });


    }

    @Test
    public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception {
        String topic = "persistent://" + NAMESPACE1 + "/testSnapShot";
        admin.topics().createNonPartitionedTopic(topic);
        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
                .getBrokerService().getTopic(topic, false)
                .get().get();

        ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
                .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
                .startMessageId(MessageId.latest)
                .topic(NAMESPACE1 + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
        Reader<TransactionBufferSnapshot> reader = readerBuilder.create();

        long waitSnapShotTime = getPulsarServiceList().get(0).getConfiguration()
                .getTransactionBufferSnapshotMinTimeInMillis();
        Awaitility.await().atMost(waitSnapShotTime * 2, TimeUnit.MILLISECONDS)
                .untilAsserted(() -> Assert.assertFalse(reader.hasMessageAvailable()));

        //test take snapshot by build producer by the transactionEnable client
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS)
                .topic(topic).enableBatching(true)
                .create();

        Awaitility.await().untilAsserted(() -> {
            Message<TransactionBufferSnapshot> message1 = reader.readNext();
            TransactionBufferSnapshot snapshot1 = message1.getValue();
            Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), -1);
        });

        // test snapshot by publish  normal messages.
        producer.newMessage(Schema.STRING).value("common message send").send();
        producer.newMessage(Schema.STRING).value("common message send").send();

        Awaitility.await().untilAsserted(() -> {
            Message<TransactionBufferSnapshot> message1 = reader.readNext();
            TransactionBufferSnapshot snapshot1 = message1.getValue();
            Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 1);
        });
    }


    @Test
    public void testAppendBufferWithNotManageLedgerExceptionCanCastToMLE()
            throws Exception {
        String topic = "persistent://pulsar/system/testReCreateTopic";
        admin.topics().createNonPartitionedTopic(topic);

        PersistentTopic persistentTopic =
                (PersistentTopic) pulsarServiceList.get(0).getBrokerService()
                        .getTopic(topic, false)
                        .get().get();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Topic.PublishContext publishContext = new Topic.PublishContext() {

            @Override
            public String getProducerName() {
                return "test";
            }

            public long getSequenceId() {
                return  30;
            }
            /**
             * Return the producer name for the original producer.
             *
             * For messages published locally, this will return the same local producer name, though in case of
             * replicated messages, the original producer name will differ
             */
            public String getOriginalProducerName() {
                return "test";
            }

            public long getOriginalSequenceId() {
                return  30;
            }

            public long getHighestSequenceId() {
                return  30;
            }

            public long getOriginalHighestSequenceId() {
                return  30;
            }

            public long getNumberOfMessages() {
                return  30;
            }

            @Override
            public void completed(Exception e, long ledgerId, long entryId) {
                Assert.assertTrue(e.getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException);
                countDownLatch.countDown();
            }
        };

        //Close topic manageLedger.
        persistentTopic.getManagedLedger().close();

        //Publish to a closed managerLedger to test ManagerLedgerException.
        persistentTopic.publishTxnMessage(new TxnID(123L, 321L),
                Unpooled.copiedBuffer("message", UTF_8), publishContext);

        //If it times out, it means that the assertTrue in publishContext.completed is failed.
        Awaitility.await().until(() -> {
            countDownLatch.await();
            return true;
        });
    }

    @Test
    public void testMaxReadPositionForNormalPublish() throws Exception {
        String topic = "persistent://" + NAMESPACE1 + "/NormalPublish";
        admin.topics().createNonPartitionedTopic(topic);
        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
                .getTopic(topic, false).get().get();

        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
        PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false)
                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).build();

        //test the state of TransactionBuffer is NoSnapshot
        //before build Producer by pulsarClient that enables transaction.
        Producer<String> normalProducer = noTxnClient.newProducer(Schema.STRING)
                .producerName("testNormalPublish")
                .topic(topic)
                .sendTimeout(0, TimeUnit.SECONDS)
                .create();
        Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfNoSnapshot()));

        //test publishing normal messages will change maxReadPosition in the state of NoSnapshot.
        MessageIdImpl messageId = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
        PositionImpl position = topicTransactionBuffer.getMaxReadPosition();
        Assert.assertEquals(position.getLedgerId(), messageId.getLedgerId());
        Assert.assertEquals(position.getEntryId(), messageId.getEntryId());

        //test the state of TransactionBuffer is Ready after build Producer by pulsarClient that enables transaction.
        Producer<String> txnProducer = pulsarClient.newProducer(Schema.STRING)
                .producerName("testTransactionPublish")
                .topic(topic)
                .sendTimeout(0, TimeUnit.SECONDS)
                .create();

        Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
        //test publishing txn messages will not change maxReadPosition if don`t commit or abort.
        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
        MessageIdImpl messageId1 = (MessageIdImpl) txnProducer.newMessage(transaction).value("txn message").send();
        PositionImpl position1 = topicTransactionBuffer.getMaxReadPosition();
        Assert.assertEquals(position1.getLedgerId(), messageId.getLedgerId());
        Assert.assertEquals(position1.getEntryId(), messageId.getEntryId());

        MessageIdImpl messageId2 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
        PositionImpl position2 = topicTransactionBuffer.getMaxReadPosition();
        Assert.assertEquals(position2.getLedgerId(), messageId.getLedgerId());
        Assert.assertEquals(position2.getEntryId(), messageId.getEntryId());
        transaction.commit().get();
        PositionImpl position3 = topicTransactionBuffer.getMaxReadPosition();

        Assert.assertEquals(position3.getLedgerId(), messageId2.getLedgerId());
        Assert.assertEquals(position3.getEntryId(), messageId2.getEntryId() + 1);

        //test publishing normal messages will change maxReadPosition if the state of TB
        //is Ready and ongoingTxns is empty.
        MessageIdImpl messageId4 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
        PositionImpl position4 = topicTransactionBuffer.getMaxReadPosition();
        Assert.assertEquals(position4.getLedgerId(), messageId4.getLedgerId());
        Assert.assertEquals(position4.getEntryId(), messageId4.getEntryId());

        //test publishing normal messages will not change maxReadPosition if the state o TB is Initializing.
        Class<TopicTransactionBufferState> transactionBufferStateClass =
                (Class<TopicTransactionBufferState>) topicTransactionBuffer.getClass().getSuperclass();
        Field field = transactionBufferStateClass.getDeclaredField("state");
        field.setAccessible(true);
        Class<TopicTransactionBuffer> topicTransactionBufferClass = TopicTransactionBuffer.class;
        Field maxReadPositionField = topicTransactionBufferClass.getDeclaredField("maxReadPosition");
        maxReadPositionField.setAccessible(true);
        field.set(topicTransactionBuffer, TopicTransactionBufferState.State.Initializing);
        MessageIdImpl messageId5 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
        PositionImpl position5 = (PositionImpl) maxReadPositionField.get(topicTransactionBuffer);
        Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
        Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());
    }

    @Test
    public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
        String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
        admin.topics().createNonPartitionedTopic(topic);
        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .producerName("test")
                .enableBatching(false)
                .sendTimeout(0, TimeUnit.SECONDS)
                .topic(topic)
                .create();
        Transaction txn = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

        producer.newMessage(txn).value("test").send();

        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
                .getTopic("persistent://" + topic, false).get().get();
        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);

        ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
        doReturn("transaction-buffer-sub").when(managedCursor).getName();
        doReturn(true).when(managedCursor).hasMoreEntries();
        doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
            callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
                    null);
            return null;
        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
        Class<ManagedLedgerImpl> managedLedgerClass = ManagedLedgerImpl.class;
        Field field = managedLedgerClass.getDeclaredField("cursors");
        field.setAccessible(true);
        ManagedCursorContainer managedCursors = (ManagedCursorContainer) field.get(persistentTopic.getManagedLedger());
        managedCursors.removeCursor("transaction-buffer-sub");
        managedCursors.add(managedCursor);

        doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
            callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
            return null;
        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

        TransactionBuffer buffer2 = new TopicTransactionBuffer(persistentTopic);
        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                assertEquals(buffer2.getStats(false).state, "Ready"));
        managedCursors.removeCursor("transaction-buffer-sub");

        doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
            callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
            return null;
        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

        managedCursors.add(managedCursor);
        TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                assertEquals(buffer3.getStats(false).state, "Ready"));
        persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
            assertTrue(internalStats.cursors.isEmpty());
        });
        managedCursors.removeCursor("transaction-buffer-sub");
    }

    @Test
    public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
        TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
        HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
                1, TimeUnit.MILLISECONDS);

        String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable";
        admin.topics().createNonPartitionedTopic(topic);
        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .producerName("test")
                .enableBatching(false)
                .sendTimeout(0, TimeUnit.SECONDS)
                .topic(topic)
                .create();
        producer.newMessage().send();

        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
                .getTopic(topic, false).get().get();
        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
        PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic
                .createSubscription("test",
                CommandSubscribe.InitialPosition.Earliest, false, null).get();

        ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
        doReturn(true).when(managedCursor).hasMoreEntries();
        doReturn(false).when(managedCursor).isClosed();
        doReturn(new PositionImpl(-1, -1)).when(managedCursor).getMarkDeletedPosition();
        doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
            callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
                    null);
            return null;
        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

        TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
        doReturn(CompletableFuture.completedFuture(
                new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null,
                        500, bufferedWriterConfig, transactionTimer)))
                .when(pendingAckStoreProvider).newPendingAckStore(any());
        doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());

        Class<PulsarService> pulsarServiceClass = PulsarService.class;
        Field field = pulsarServiceClass.getDeclaredField("transactionPendingAckStoreProvider");
        field.setAccessible(true);
        field.set(getPulsarServiceList().get(0), pendingAckStoreProvider);

        PendingAckHandleImpl pendingAckHandle1 = new PendingAckHandleImpl(persistentSubscription);
        Awaitility.await().untilAsserted(() ->
                assertEquals(pendingAckHandle1.getStats(false).state, "Ready"));

        doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
            callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
            return null;
        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

        PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
        Awaitility.await().untilAsserted(() ->
                assertEquals(pendingAckHandle2.getStats(false).state, "Ready"));

        doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
            callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
            return null;
        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

        PendingAckHandleImpl pendingAckHandle3 = new PendingAckHandleImpl(persistentSubscription);

        Awaitility.await().untilAsserted(() ->
                assertEquals(pendingAckHandle3.getStats(false).state, "Ready"));

        // cleanup
        transactionTimer.stop();
    }

    @Test
    public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
        HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
                1, TimeUnit.MILLISECONDS);

        String topic = NAMESPACE1 + "/testEndTCRecoveringWhenManagerLedgerDisReadable";
        admin.topics().createNonPartitionedTopic(topic);

        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
                .getTopic(topic, false).get().get();
        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
        Map<String, String> map = new HashMap<>();
        map.put(MLTransactionSequenceIdGenerator.MAX_LOCAL_TXN_ID, "1");
        persistentTopic.getManagedLedger().setProperties(map);

        ManagedCursor managedCursor = mock(ManagedCursor.class);
        doReturn(true).when(managedCursor).hasMoreEntries();
        doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
            callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
                    null);
            return null;
        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
        persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
        MLTransactionLogImpl mlTransactionLog =
                new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
                        persistentTopic.getManagedLedger().getConfig(), new TxnLogBufferedWriterConfig(),
                        transactionTimer);
        Class<MLTransactionLogImpl> mlTransactionLogClass = MLTransactionLogImpl.class;
        Field field = mlTransactionLogClass.getDeclaredField("cursor");
        field.setAccessible(true);
        field.set(mlTransactionLog, managedCursor);
        field = mlTransactionLogClass.getDeclaredField("managedLedger");
        field.setAccessible(true);
        field.set(mlTransactionLog, persistentTopic.getManagedLedger());

        TransactionRecoverTracker transactionRecoverTracker = mock(TransactionRecoverTracker.class);
        doNothing().when(transactionRecoverTracker).appendOpenTransactionToTimeoutTracker();
        doNothing().when(transactionRecoverTracker).handleCommittingAndAbortingTransaction();
        TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class);
        doNothing().when(timeoutTracker).start();
        MLTransactionMetadataStore metadataStore1 =
                new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
        metadataStore1.init(transactionRecoverTracker).get();
        Awaitility.await().untilAsserted(() ->
                assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));

        doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
            callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
            return null;
        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

        MLTransactionMetadataStore metadataStore2 =
                new MLTransactionMetadataStore(new TransactionCoordinatorID(1),

                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
        metadataStore2.init(transactionRecoverTracker).get();
        Awaitility.await().untilAsserted(() ->
                assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));

        doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
            callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
            return null;
        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

        MLTransactionMetadataStore metadataStore3 =
                new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
        metadataStore3.init(transactionRecoverTracker).get();
        Awaitility.await().untilAsserted(() ->
                assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));

        // cleanup.
        transactionTimer.stop();
    }

    @Test
    public void testEndTxnWhenCommittingOrAborting() throws Exception {
        CounterBrokerInterceptor listener = (CounterBrokerInterceptor) getPulsarServiceList().get(0)
                .getBrokerInterceptor();
        listener.reset();
        Transaction commitTxn = pulsarClient
                .newTransaction()
                .withTransactionTimeout(5, TimeUnit.SECONDS)
                .build()
                .get();
        Transaction abortTxn = pulsarClient
                .newTransaction()
                .withTransactionTimeout(5, TimeUnit.SECONDS)
                .build()
                .get();

        Class<TransactionImpl> transactionClass = TransactionImpl.class;
        Field field = transactionClass.getDeclaredField("state");
        field.setAccessible(true);

        field.set(commitTxn, TransactionImpl.State.COMMITTING);
        field.set(abortTxn, TransactionImpl.State.ABORTING);


        assertEquals(((CounterBrokerInterceptor)listener).getTxnCount(),2);
        abortTxn.abort().get();
        assertEquals(((CounterBrokerInterceptor)listener).getAbortedTxnCount(),1);
        commitTxn.commit().get();
        assertEquals(((CounterBrokerInterceptor)listener).getCommittedTxnCount(),1);
    }

    @Test
    public void testNoEntryCanBeReadWhenRecovery() throws Exception {
        String topic = NAMESPACE1 + "/test";
        PersistentTopic persistentTopic =
                (PersistentTopic) pulsarServiceList.get(0).getBrokerService()
                        .getTopic(TopicName.get(topic).toString(), true)
                        .get()
                        .get();

        Class<PersistentTopic> persistentTopicClass = PersistentTopic.class;
        Field filed1 = persistentTopicClass.getDeclaredField("ledger");
        Field field2 = persistentTopicClass.getDeclaredField("transactionBuffer");
        filed1.setAccessible(true);
        field2.setAccessible(true);
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) spy(filed1.get(persistentTopic));
        filed1.set(persistentTopic, managedLedger);

        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) field2.get(persistentTopic);
        Method method = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
        method.setAccessible(true);
        CompletableFuture<Void> completableFuture = (CompletableFuture<Void>) method.invoke(topicTransactionBuffer);
        completableFuture.get();

        doReturn(PositionImpl.LATEST).when(managedLedger).getLastConfirmedEntry();
        ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
        doReturn(false).when(managedCursor).hasMoreEntries();
        doReturn(managedCursor).when(managedLedger).newNonDurableCursor(any(), any());

        TopicTransactionBuffer transactionBuffer = new TopicTransactionBuffer(persistentTopic);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue(transactionBuffer.checkIfReady()));
    }

    @Test
    public void testRetryExceptionOfEndTxn() throws Exception{
        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS)
                .build()
                .get();
        Class<TransactionMetadataStoreState> transactionMetadataStoreStateClass = TransactionMetadataStoreState.class;
        getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores()
                .values()
                .forEach((transactionMetadataStore -> {
                    try {
                        Field field = transactionMetadataStoreStateClass.getDeclaredField("state");
                        field.setAccessible(true);
                        field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Initializing);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }));
        CompletableFuture<Void> completableFuture =  transaction.commit();
        try {
            completableFuture.get(5, TimeUnit.SECONDS);
            fail();
        } catch (TimeoutException ignored) {
        }
        getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores()
                .values()
                .stream()
                .forEach((transactionMetadataStore -> {
                    try {
                        Field field = transactionMetadataStoreStateClass.getDeclaredField("state");
                        field.setAccessible(true);
                        field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Ready);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }));
        completableFuture.get(5, TimeUnit.SECONDS);
    }

    @Test
    public void testCancelTxnTimeout() throws Exception{
        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS)
                .build()
                .get();

        transaction.commit().get();

        Field field = TransactionImpl.class.getDeclaredField("timeout");
        field.setAccessible(true);
        Timeout timeout = (Timeout) field.get(transaction);
        Assert.assertTrue(timeout.isCancelled());

        transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS)
                .build()
                .get();

        transaction.abort().get();
        timeout = (Timeout) field.get(transaction);
        Assert.assertTrue(timeout.isCancelled());
    }

    @Test
    public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
                .getBrokerService()
                .getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
                .get().get();
        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
        Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
        field.setAccessible(true);
        AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) field.get(buffer);
        Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
        field1.setAccessible(true);

        Awaitility.await().untilAsserted(() -> {
                    TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer);
                    Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot);
        });
        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);

        buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);

    }

    @Test
    public void testAutoCreateSchemaForTransactionSnapshot() throws Exception {
        String namespace = TENANT + "/ns2";
        String topic = namespace + "/test";
        pulsarServiceList.forEach((pulsarService ->
                pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(false)));
        admin.namespaces().createNamespace(namespace);
        admin.topics().createNonPartitionedTopic(topic);
        TopicName transactionBufferTopicName =
                NamespaceEventsSystemTopicFactory.getSystemTopicName(
                        TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
        TopicName transactionBufferTopicName1 =
                NamespaceEventsSystemTopicFactory.getSystemTopicName(
                        TopicName.get(topic).getNamespaceObject(), EventType.TOPIC_POLICY);
        Awaitility.await().untilAsserted(() -> {
            SchemaInfo schemaInfo = admin
                    .schemas()
                    .getSchemaInfo(transactionBufferTopicName.toString());
            Assert.assertNotNull(schemaInfo);
            SchemaInfo schemaInfo1 = admin
                    .schemas()
                    .getSchemaInfo(transactionBufferTopicName1.toString());
            Assert.assertNotNull(schemaInfo1);
        });
        pulsarServiceList.forEach((pulsarService ->
                pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
    }

    @Test
    public void testPendingAckMarkDeletePosition() throws Exception {
        getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(1);
        getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(5);
        String topic = NAMESPACE1 + "/test1";

        @Cleanup
        Producer<byte[]> producer = pulsarClient
                .newProducer(Schema.BYTES)
                .topic(topic)
                .sendTimeout(0, TimeUnit.SECONDS)
                .create();

        @Cleanup
        Consumer<byte[]> consumer = pulsarClient
                .newConsumer()
                .topic(topic)
                .subscriptionName("sub")
                .subscribe();
        consumer.getSubscription();

        PersistentSubscription persistentSubscription = (PersistentSubscription) getPulsarServiceList()
                .get(0)
                .getBrokerService()
                .getTopic(topic, false)
                .get()
                .get()
                .getSubscription("sub");

        ManagedCursor subscriptionCursor = persistentSubscription.getCursor();

        subscriptionCursor.getMarkDeletedPosition();
        //pendingAck add message1 and commit mark, metadata add message1
        //PersistentMarkDeletedPosition have not updated
        producer.newMessage()
                .value("test".getBytes(UTF_8))
                .send();
        Transaction transaction = pulsarClient
                .newTransaction()
                .withTransactionTimeout(5, TimeUnit.MINUTES)
                .build().get();

        Message<byte[]> message1 = consumer.receive(10, TimeUnit.SECONDS);

        consumer.acknowledgeAsync(message1.getMessageId(), transaction);
        transaction.commit().get();
        //PersistentMarkDeletedPosition of subscription have updated to message1,
        //check whether delete markDeletedPosition of pendingAck after append entry to pendingAck
        transaction = pulsarClient
                .newTransaction()
                .withTransactionTimeout(5, TimeUnit.MINUTES)
                .build().get();

        producer.newMessage()
                .value("test".getBytes(UTF_8))
                .send();
        Message<byte[]> message2 = consumer.receive(10, TimeUnit.SECONDS);
        consumer.acknowledgeAsync(message2.getMessageId(), transaction);

        Awaitility.await().untilAsserted(() -> {
            ManagedLedgerInternalStats managedLedgerInternalStats = admin
                    .transactions()
                    .getPendingAckInternalStats(topic, "sub", false)
                    .pendingAckLogStats
                    .managedLedgerInternalStats;
            String [] markDeletePosition = managedLedgerInternalStats.cursors.get("__pending_ack_state")
                    .markDeletePosition.split(":");
            String [] lastConfirmedEntry = managedLedgerInternalStats.lastConfirmedEntry.split(":");
            Assert.assertEquals(markDeletePosition[0], lastConfirmedEntry[0]);
            //don`t contain commit mark and unCommitted message2
            Assert.assertEquals(Integer.parseInt(markDeletePosition[1]),
                    Integer.parseInt(lastConfirmedEntry[1]) - 2);
        });
    }

    @Test
    public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
        TransactionMetadataStore transactionMetadataStore = getPulsarServiceList().get(0)
                .getTransactionMetadataStoreService()
                .getStores()
                .get(new TransactionCoordinatorID(0));

        Field field = MLTransactionMetadataStore.class.getDeclaredField("transactionLog");
        field.setAccessible(true);
        MLTransactionLogImpl transactionLog = (MLTransactionLogImpl) field.get(transactionMetadataStore);
        Field field1 = MLTransactionLogImpl.class.getDeclaredField("cursor");
        field1.setAccessible(true);
        ManagedCursorImpl managedCursor = (ManagedCursorImpl) field1.get(transactionLog);
        managedCursor.close();

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(5, TimeUnit.SECONDS)
                .build()
                .get();

        transaction.commit().get();
    }

    @Test(timeOut = 30000)
    public void testTransactionAckMessageList() throws Exception {
        String topic = "persistent://" + NAMESPACE1 +"/test";
        String subName = "testSub";

        @Cleanup
        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(topic)
                .sendTimeout(5, TimeUnit.SECONDS)
                .create();
        @Cleanup
        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName(subName)
                .subscribe();

        for (int i = 0; i < 5; i++) {
            producer.newMessage().send();
        }
        //verify using aborted transaction to ack message list
        List<MessageId> messages = new ArrayList<>();
        for (int i = 0; i < 4; i++) {
            Message<byte[]> message = consumer.receive();
            messages.add(message.getMessageId());
        }
        Transaction transaction = pulsarClient
                .newTransaction()
                .withTransactionTimeout(5, TimeUnit.MINUTES)
                .build()
                .get();

        consumer.acknowledgeAsync(messages, transaction);
        transaction.abort().get();
        consumer.close();
        consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName(subName)
                .subscribe();
        for (int i = 0; i < 4; i++) {
            Message<byte[]> message = consumer.receive();
            assertTrue(messages.contains(message.getMessageId()));
        }

        //verify using committed transaction to ack message list
        transaction = pulsarClient
                .newTransaction()
                .withTransactionTimeout(5, TimeUnit.MINUTES)
                .build()
                .get();
        consumer.acknowledgeAsync(messages, transaction);
        transaction.commit().get();

        consumer.close();
        consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName(subName)
                .subscribe();
        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNotNull(message);
        Assert.assertFalse(messages.contains(message.getMessageId()));
        message = consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull(message);
        consumer.close();
    }


    @Test(timeOut = 30000)
    public void testTransactionAckMessages() throws Exception {
        String topic = "persistent://" + NAMESPACE1 +"/testTransactionAckMessages";
        String subName = "testSub";
        admin.topics().createPartitionedTopic(topic, 2);

        @Cleanup
        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(topic)
                .sendTimeout(5, TimeUnit.SECONDS)
                .create();
        @Cleanup
        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName(subName)
                .subscribe();

        for (int i = 0; i < 4; i++) {
            producer.newMessage().send();
        }
        Method method = ConsumerBase.class.getDeclaredMethod("getNewMessagesImpl");
        method.setAccessible(true);

        Field field = MessagesImpl.class.getDeclaredField("messageList");
        field.setAccessible(true);

        MessagesImpl<byte[]> messages = (MessagesImpl<byte[]>) method.invoke(consumer);

        List<Message<byte[]>> messageList = new ArrayList<>();
        for (int i = 0; i < 4; i++) {
            Message<byte[]> message = consumer.receive();
            messageList.add(message);
        }
        field.set(messages, messageList);
        Transaction transaction = pulsarClient
                .newTransaction()
                .withTransactionTimeout(5, TimeUnit.MINUTES)
                .build()
                .get();

        consumer.acknowledgeAsync(messages, transaction);
        transaction.abort().get();
        consumer.close();
        consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName(subName)
                .subscribe();
        List<MessageId> messageIds = new ArrayList<>();
        for (Message message : messageList) {
            messageIds.add(message.getMessageId());
        }
        for (int i = 0; i < 4; i++) {
            Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
            assertTrue(messageIds.contains(message.getMessageId()));
        }

        //verify using committed transaction to ack message list
        transaction = pulsarClient
                .newTransaction()
                .withTransactionTimeout(5, TimeUnit.MINUTES)
                .build()
                .get();
        consumer.acknowledgeAsync(messages, transaction);
        transaction.commit().get();

        consumer.close();
        consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName(subName)
                .subscribe();
        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull(message);
        consumer.close();
    }

    @Test
    public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception {
        String topic = NAMESPACE1 + "/testGetConnectExceptionForAckMsgWhenCnxIsNull";
        @Cleanup
        Producer<byte[]> producer = pulsarClient
                .newProducer(Schema.BYTES)
                .topic(topic)
                .sendTimeout(0, TimeUnit.SECONDS)
                .create();

        @Cleanup
        Consumer<byte[]> consumer = pulsarClient
                .newConsumer()
                .topic(topic)
                .subscriptionName("sub")
                .subscribe();

        for (int i = 0; i < 10; i++) {
            producer.newMessage().value(Bytes.toBytes(i)).send();
        }
        ClientCnx cnx = Whitebox.invokeMethod(consumer, "cnx");
        Whitebox.invokeMethod(consumer, "connectionClosed", cnx);

        Message<byte[]> message = consumer.receive();
        Transaction transaction = pulsarClient
                .newTransaction()
                .withTransactionTimeout(5, TimeUnit.SECONDS)
                .build().get();

        try {
            consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
            fail();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof PulsarClientException.ConnectException);
        }
    }


    @Test
    public void testPendingAckBatchMessageCommit() throws Exception {
        String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit";

        // enable batch index ack
        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);

        @Cleanup
        Producer<byte[]> producer = pulsarClient
                .newProducer(Schema.BYTES)
                .topic(topic)
                .enableBatching(true)
                // ensure that batch message is sent
                .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
                .sendTimeout(0, TimeUnit.SECONDS)
                .create();

        @Cleanup
        Consumer<byte[]> consumer = pulsarClient
                .newConsumer()
                .subscriptionType(SubscriptionType.Shared)
                .topic(topic)
                .subscriptionName("sub")
                .subscribe();

        // send batch message, the size is 5
        for (int i = 0; i < 5; i++) {
            producer.sendAsync(("test" + i).getBytes());
        }

        producer.flush();

        Transaction txn1 = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
        // ack the first message with transaction
        consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn1).get();
        Transaction txn2 = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
        // ack the second message with transaction
        MessageId messageId = consumer.receive().getMessageId();
        consumer.acknowledgeAsync(messageId, txn2).get();

        // commit the txn1
        txn1.commit().get();
        // abort the txn2
        txn2.abort().get();

        Transaction txn3 = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
        // repeat ack the second message, can ack successful
        consumer.acknowledgeAsync(messageId, txn3).get();
    }

    /**
     * When change pending ack handle state failure, exceptionally complete cmd-subscribe.
     * see: https://github.com/apache/pulsar/pull/16248.
     */
    @Test
    public void testPendingAckReplayChangeStateError() throws InterruptedException, TimeoutException {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        // Create Executor
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        // Mock serviceConfiguration.
        ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
        when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
        // Mock executorProvider.
        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
        when(executorProvider.getExecutor()).thenReturn(executorService);
        when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService);
        // Mock pendingAckStore.
        PendingAckStore pendingAckStore = mock(PendingAckStore.class);
        doAnswer(new Answer() {
            @Override
            public Object answer(InvocationOnMock invocation) throws Throwable {
                executorService.execute(()->{
                    PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0];
                    pendingAckHandle.close();
                    MLPendingAckReplyCallBack mlPendingAckReplyCallBack
                            = new MLPendingAckReplyCallBack(pendingAckHandle);
                    mlPendingAckReplyCallBack.replayComplete();
                });
                return null;
            }
        }).when(pendingAckStore).replayAsync(any(), any());
        // Mock executorProvider.
        TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
        when(pendingAckStoreProvider.checkInitializedBefore(any()))
                .thenReturn(CompletableFuture.completedFuture(true));
        when(pendingAckStoreProvider.newPendingAckStore(any()))
                .thenReturn(CompletableFuture.completedFuture(pendingAckStore));
        // Mock pulsar.
        PulsarService pulsar = mock(PulsarService.class);
        when(pulsar.getConfig()).thenReturn(serviceConfiguration);
        when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
        when(pulsar.getTransactionPendingAckStoreProvider()).thenReturn(pendingAckStoreProvider);
        // Mock brokerService.
        BrokerService brokerService = mock(BrokerService.class);
        when(brokerService.getPulsar()).thenReturn(pulsar);
        when(brokerService.pulsar()).thenReturn(pulsar);
        // Mock topic.
        PersistentTopic topic = mock(PersistentTopic.class);
        when(topic.getBrokerService()).thenReturn(brokerService);
        when(topic.getName()).thenReturn("topic-a");
        // Mock cursor for subscription.
        ManagedCursor cursor_subscription = mock(ManagedCursor.class);
        doThrow(new RuntimeException("1")).when(cursor_subscription).updateLastActive();
        // Create subscription.
        String subscriptionName = "sub-a";
        boolean replicated = false;
        Map<String, String> subscriptionProperties = Collections.emptyMap();
        PersistentSubscription persistentSubscription = new PersistentSubscription(topic, subscriptionName,
                cursor_subscription, replicated, subscriptionProperties);
        org.apache.pulsar.broker.service.Consumer consumer = mock(org.apache.pulsar.broker.service.Consumer.class);
        try {
            CompletableFuture<Void> addConsumerFuture = persistentSubscription.addConsumer(consumer);
            addConsumerFuture.get(5, TimeUnit.SECONDS);
            fail("Expect failure by PendingAckHandle closed, but success");
        } catch (ExecutionException executionException){
            Throwable t = executionException.getCause();
            Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
        }
    }

    /**
     * When change TB state failure, exceptionally complete cmd-producer.
     * see: https://github.com/apache/pulsar/pull/16248.
     */
    @Test
    public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException {
        final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<PersistentTopic>();
        AtomicInteger atomicInteger = new AtomicInteger(1);
        // Create Executor
        ScheduledExecutorService executorService_recover = mock(ScheduledExecutorService.class);
        // Mock serviceConfiguration.
        ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
        when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false);
        when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
        // Mock executorProvider.
        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
        when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService_recover);
        // Mock pendingAckStore.
        PendingAckStore pendingAckStore = mock(PendingAckStore.class);
        doAnswer(new Answer() {
            @Override
            public Object answer(InvocationOnMock invocation) throws Throwable {
                new Thread(() -> {
                    TopicTransactionBuffer.TopicTransactionBufferRecover recover
                            = (TopicTransactionBuffer.TopicTransactionBufferRecover)invocation.getArguments()[0];
                    TopicTransactionBufferRecoverCallBack callBack
                            = Whitebox.getInternalState(recover, "callBack");;
                    try {
                        persistentTopic.get().getTransactionBuffer().closeAsync().get();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    } catch (ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                    callBack.recoverComplete();
                }).start();
                return null;
            }
        }).when(executorService_recover).execute(any());
        // Mock executorProvider.
        TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
        when(pendingAckStoreProvider.checkInitializedBefore(any()))
                .thenReturn(CompletableFuture.completedFuture(true));
        when(pendingAckStoreProvider.newPendingAckStore(any()))
                .thenReturn(CompletableFuture.completedFuture(pendingAckStore));
        // Mock TransactionBufferSnapshotService
        TransactionBufferSnapshotService transactionBufferSnapshotService
                = mock(TransactionBufferSnapshotService.class);
        SystemTopicClient.Writer writer = mock(SystemTopicClient.Writer.class);
        when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
        when(transactionBufferSnapshotService.createWriter(any()))
                .thenReturn(CompletableFuture.completedFuture(writer));
        // Mock pulsar.
        PulsarService pulsar = mock(PulsarService.class);
        when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
        when(pulsar.getConfig()).thenReturn(serviceConfiguration);
        when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
        when(pulsar.getTransactionBufferSnapshotService()).thenReturn(transactionBufferSnapshotService);
        TopicTransactionBufferProvider topicTransactionBufferProvider = new TopicTransactionBufferProvider();
        when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider);
        // Mock BacklogQuotaManager
        BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class);
        // Mock brokerService.
        BrokerService brokerService = mock(BrokerService.class);
        when(brokerService.getPulsar()).thenReturn(pulsar);
        when(brokerService.pulsar()).thenReturn(pulsar);
        when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
        // Mock managedLedger.
        ManagedLedgerImpl managedLedger = mock(ManagedLedgerImpl.class);
        ManagedCursorContainer managedCursors = new ManagedCursorContainer();
        when(managedLedger.getCursors()).thenReturn(managedCursors);
        PositionImpl position = PositionImpl.EARLIEST;
        when(managedLedger.getLastConfirmedEntry()).thenReturn(position);
        // Create topic.
        persistentTopic.set(new PersistentTopic("topic-a", managedLedger, brokerService));
        try {
            // Do check.
            persistentTopic.get().checkIfTransactionBufferRecoverCompletely(true).get(5, TimeUnit.SECONDS);
            fail("Expect failure by TB closed, but it is finished.");
        } catch (ExecutionException executionException){
            Throwable t = executionException.getCause();
            Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
        }
    }
}
