/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.compaction;

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Deleted;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Init;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;

import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-compaction")
public class ServiceUnitStateCompactionTest extends MockedPulsarServiceBaseTest {
    private ScheduledExecutorService compactionScheduler;
    private BookKeeper bk;
    private Schema<ServiceUnitStateData> schema;
    private ServiceUnitStateCompactionStrategy strategy;

    private ServiceUnitState testState = Init;

    private ServiceUnitStateData testData = null;

    private static Random RANDOM = new Random();


    private ServiceUnitStateData testValue(ServiceUnitState state, String broker) {
        if (state == Init) {
            testData = null;
        } else {
            testData = new ServiceUnitStateData(state, broker, versionId(testData) + 1);
        }

        return testData;
    }

    private ServiceUnitStateData testValue(String broker) {
        testState = nextValidStateNonSplit(testState);
        return testValue(testState, broker);
    }

    private ServiceUnitState nextValidState(ServiceUnitState from) {
        List<ServiceUnitState> candidates = Arrays.stream(ServiceUnitState.values())
                .filter(to -> isValidTransition(from, to))
                .collect(Collectors.toList());
        var state=  candidates.get(RANDOM.nextInt(candidates.size()));
        return state;
    }

    private ServiceUnitState nextValidStateNonSplit(ServiceUnitState from) {
        List<ServiceUnitState> candidates = Arrays.stream(ServiceUnitState.values())
                .filter(to -> to != Init && to != Splitting && to != Deleted
                        && isValidTransition(from, to))
                .collect(Collectors.toList());
        var state=  candidates.get(RANDOM.nextInt(candidates.size()));
        return state;
    }

    private ServiceUnitState nextInvalidState(ServiceUnitState from) {
        List<ServiceUnitState> candidates = Arrays.stream(ServiceUnitState.values())
                .filter(to -> !isValidTransition(from, to))
                .collect(Collectors.toList());
        if (candidates.size() == 0) {
            return Init;
        }
        return candidates.get(RANDOM.nextInt(candidates.size()));
    }

    @BeforeMethod
    @Override
    public void setup() throws Exception {
        super.internalSetup();

        admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
        admin.tenants().createTenant("my-property",
                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
        admin.namespaces().createNamespace("my-property/use/my-ns");

        compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
        schema = Schema.JSON(ServiceUnitStateData.class);
        strategy = new ServiceUnitStateCompactionStrategy();
        strategy.checkBrokers(false);

        testState = Init;
        testData = null;
    }


    @AfterMethod(alwaysRun = true)
    @Override
    public void cleanup() throws Exception {
        super.internalCleanup();
        bk.close();
        if (compactionScheduler != null) {
            compactionScheduler.shutdownNow();
        }
    }


    public record TestData(
            String topic,
            Map<String, ServiceUnitStateData> expected,
            List<Pair<String, ServiceUnitStateData>> all) {

    }
    TestData generateTestData() throws PulsarAdminException, PulsarClientException {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        final int numMessages = 20;
        final int maxKeys = 5;

        // Configure retention to ensue data is retained for reader
        admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1));

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
                .topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();

        Map<String, ServiceUnitStateData> expected = new HashMap<>();
        List<Pair<String, ServiceUnitStateData>> all = new ArrayList<>();
        Random r = new Random(0);

        pulsarClient.newConsumer(schema)
                .topic(topic)
                .subscriptionName("sub1")
                .readCompacted(true)
                .subscribe().close();

        for (int j = 0; j < numMessages; j++) {
            int keyIndex = r.nextInt(maxKeys);
            String key = "key" + keyIndex;
            ServiceUnitStateData prev = expected.get(key);
            ServiceUnitState prevState = state(prev);
            boolean invalid =  r.nextBoolean();
            ServiceUnitState state = invalid ? nextInvalidState(prevState) :
                    nextValidState(prevState);
            ServiceUnitStateData value;
            long versionId = versionId(prev) + 1;
            if (invalid) {
                value = new ServiceUnitStateData(state, key + ":" + j, false, versionId);
            } else {
                if (state == Init) {
                    value = new ServiceUnitStateData(state, key + ":" + j, true, versionId);
                } else {
                    value = new ServiceUnitStateData(state, key + ":" + j, false, versionId);
                }
            }

            producer.newMessage().key(key).value(value).send();
            if (!strategy.shouldKeepLeft(prev, value)) {
                expected.put(key, value);
            }
            all.add(Pair.of(key, value));
        }
        return new TestData(topic, expected, all);
    }

    @Test
    public void testCompaction() throws Exception {
        TestData testData = generateTestData();
        var topic = testData.topic;
        var expected = testData.expected;
        var all = testData.all;

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
        // Compacted topic ledger should have same number of entry equals to number of unique key.
        //Assert.assertEquals(internalStats.compactedLedger.entries, expected.size());
        Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
        Assert.assertFalse(internalStats.compactedLedger.offloaded);

        // consumer with readCompacted enabled only get compacted entries
        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            while (true) {
                Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertEquals(expected.remove(m.getKey()), m.getValue());
                if (expected.isEmpty()) {
                    break;
                }
            }
            Assert.assertTrue(expected.isEmpty());
        }

        // can get full backlog if read compacted disabled
        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(false).subscribe()) {
            while (true) {
                Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
                Pair<String, ServiceUnitStateData> expectedMessage = all.remove(0);
                Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
                Assert.assertEquals(expectedMessage.getRight(), m.getValue());
                if (all.isEmpty()) {
                    break;
                }
            }
            Assert.assertTrue(all.isEmpty());
        }
    }

    @Test
    public void testCompactionWithReader() throws Exception {
        TestData testData = generateTestData();
        var topic = testData.topic;
        var expected = testData.expected;
        var all = testData.all;

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        // consumer with readCompacted enabled only get compacted entries
        try (Reader<ServiceUnitStateData> reader = pulsarClient.newReader(schema).topic(topic).readCompacted(true)
                .startMessageId(MessageId.earliest).create()) {
            while (true) {
                Message<ServiceUnitStateData> m = reader.readNext(2, TimeUnit.SECONDS);
                Assert.assertEquals(expected.remove(m.getKey()), m.getValue());
                if (expected.isEmpty()) {
                    break;
                }
            }
            Assert.assertTrue(expected.isEmpty());
        }

        // can get full backlog if read compacted disabled
        try (Reader<ServiceUnitStateData> reader = pulsarClient.newReader(schema).topic(topic).readCompacted(false)
                .startMessageId(MessageId.earliest).create()) {
            while (true) {
                Message<ServiceUnitStateData> m = reader.readNext(2, TimeUnit.SECONDS);
                Pair<String, ServiceUnitStateData> expectedMessage = all.remove(0);
                Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
                Assert.assertEquals(expectedMessage.getRight(), m.getValue());
                if (all.isEmpty()) {
                    break;
                }
            }
            Assert.assertTrue(all.isEmpty());
        }
    }


    @Test
    public void testCompactionWithTableview() throws Exception {
        var tv = pulsar.getClient().newTableViewBuilder(schema)
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .loadConf(Map.of(
                        "topicCompactionStrategyClassName",
                        ServiceUnitStateCompactionStrategy.class.getName()))
                .create();

        ((ServiceUnitStateCompactionStrategy)
                FieldUtils.readDeclaredField(tv, "compactionStrategy", true))
                .checkBrokers(false);
        TestData testData = generateTestData();
        var topic = testData.topic;
        var expected = testData.expected;
        var expectedCopy = new HashMap<>(expected);

        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(() -> assertEquals(expectedCopy.size(), tv.size()));

        for(var etr : tv.entrySet()){
            Assert.assertEquals(expectedCopy.remove(etr.getKey()), etr.getValue());
            if (expectedCopy.isEmpty()) {
                break;
            }
        }

        Assert.assertTrue(expectedCopy.isEmpty());
        tv.close();;

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        // consumer with readCompacted enabled only get compacted entries
        var tableview = pulsar.getClient().newTableView(schema)
                .topic(topic)
                .loadConf(Map.of(
                        "topicCompactionStrategyClassName",
                        ServiceUnitStateCompactionStrategy.class.getName()))
                .create();

        for(var etr : tableview.entrySet()){
            Assert.assertEquals(expected.remove(etr.getKey()), etr.getValue());
            if (expected.isEmpty()) {
                break;
            }
        }
        Assert.assertTrue(expected.isEmpty());
        tableview.close();

    }


    @Test
    public void testReadCompactedBeforeCompaction() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
                .topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .create();

        pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
        String key = "key0";
        var testValues = Arrays.asList(
                testValue("content0"), testValue("content1"), testValue("content2"));
        for (var val : testValues) {
            producer.newMessage().key(key).value(val).send();
        }

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            Message<ServiceUnitStateData> m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(0));

            m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(1));

            m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(2));
        }

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            Message<ServiceUnitStateData> m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(2));
        }
    }

    @Test
    public void testReadEntriesAfterCompaction() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
                .topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .create();

        pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();

        String key = "key0";
        var testValues = Arrays.asList(
                testValue( "content0"),
                testValue("content1"),
                testValue( "content2"),
                testValue("content3"));
        producer.newMessage().key(key).value(testValues.get(0)).send();
        producer.newMessage().key(key).value(testValues.get(1)).send();
        producer.newMessage().key(key).value(testValues.get(2)).send();

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        producer.newMessage().key(key).value(testValues.get(3)).send();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            Message<ServiceUnitStateData> m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(2));

            m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(3));
        }
    }

    @Test
    public void testSeekEarliestAfterCompaction() throws Exception {

        String topic = "persistent://my-property/use/my-ns/my-topic1";

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
                .topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .create();

        String key = "key0";
        var testValues = Arrays.asList(
                testValue("content0"),
                testValue("content1"),
                testValue("content2"));
        for (var val : testValues) {
            producer.newMessage().key(key).value(val).send();
        }

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            consumer.seek(MessageId.earliest);
            Message<ServiceUnitStateData> m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(2));
        }

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(false).subscribe()) {
            consumer.seek(MessageId.earliest);

            Message<ServiceUnitStateData> m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(0));

            m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(1));

            m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(2));
        }
    }

    @Test
    public void testSlowTableviewAfterCompaction() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        String strategyClassName = "topicCompactionStrategyClassName";
        strategy.checkBrokers(true);

        pulsarClient.newConsumer(schema)
                .topic(topic)
                .subscriptionName("sub1")
                .readCompacted(true)
                .subscribe().close();

        var fastTV = pulsar.getClient().newTableViewBuilder(schema)
                .topic(topic)
                .subscriptionName("fastTV")
                .loadConf(Map.of(
                        strategyClassName,
                        ServiceUnitStateCompactionStrategy.class.getName()))
                .create();

        var defaultConf = getDefaultConf();
        var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
        var pulsar2 = additionalPulsarTestContext.getPulsarService();

        var slowTV = pulsar2.getClient().newTableViewBuilder(schema)
                .topic(topic)
                .subscriptionName("slowTV")
                .loadConf(Map.of(
                        strategyClassName,
                        ServiceUnitStateCompactionStrategy.class.getName()))
                .create();

        var semaphore = new Semaphore(0);
        AtomicBoolean handledReleased = new AtomicBoolean(false);

        slowTV.listen((k, v) -> {
            if (v.state() == Assigning) {
                try {
                    // Stuck at handling Assigned
                    handledReleased.set(false);
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else if (v.state() == Releasing) {
                handledReleased.set(true);
            }
        });

        // Configure retention to ensue data is retained for reader
        admin.namespaces().setRetention("my-property/use/my-ns",
                new RetentionPolicies(-1, -1));

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
                .topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);

        String bundle = "bundle1";
        String src = "broker0";
        String dst = "broker1";
        long versionId = 1;
        producer.newMessage().key(bundle).value(new ServiceUnitStateData(Owned, src, versionId++)).send();
        for (int i = 0; i < 3; i++) {
            var releasedStateData = new ServiceUnitStateData(Releasing, dst, src, versionId++);
            producer.newMessage().key(bundle).value(releasedStateData).send();
            producer.newMessage().key(bundle).value(releasedStateData).send();
            var assignedStateData = new ServiceUnitStateData(Assigning, dst, src, versionId++);
            producer.newMessage().key(bundle).value(assignedStateData).send();
            producer.newMessage().key(bundle).value(assignedStateData).send();
            var ownedStateData = new ServiceUnitStateData(Owned, dst, src, versionId++);
            producer.newMessage().key(bundle).value(ownedStateData).send();
            producer.newMessage().key(bundle).value(ownedStateData).send();
            compactor.compact(topic, strategy).get();

            Awaitility.await()
                    .pollInterval(200, TimeUnit.MILLISECONDS)
                    .atMost(10, TimeUnit.SECONDS)
                    .untilAsserted(() -> assertEquals(fastTV.get(bundle), ownedStateData));

            Awaitility.await()
                    .pollInterval(200, TimeUnit.MILLISECONDS)
                    .atMost(10, TimeUnit.SECONDS)
                    .untilAsserted(() -> assertEquals(slowTV.get(bundle), assignedStateData));
            assertTrue(!handledReleased.get());
            semaphore.release();

            Awaitility.await()
                    .pollInterval(200, TimeUnit.MILLISECONDS)
                    .atMost(10, TimeUnit.SECONDS)
                    .untilAsserted(() -> assertEquals(slowTV.get(bundle), ownedStateData));

            var newTv = pulsar.getClient().newTableView(schema)
                    .topic(topic)
                    .loadConf(Map.of(
                            strategyClassName,
                            ServiceUnitStateCompactionStrategy.class.getName()))
                    .create();
            Awaitility.await()
                    .pollInterval(200, TimeUnit.MILLISECONDS)
                    .atMost(10, TimeUnit.SECONDS)
                    .untilAsserted(() -> assertEquals(newTv.get(bundle), ownedStateData));

            src = dst;
            dst = "broker" + (i + 2);
            newTv.close();
        }

        producer.close();
        slowTV.close();
        fastTV.close();
        pulsar2.close();

    }

    @Test
    public void testSlowReceiveTableviewAfterCompaction() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        String strategyClassName = "topicCompactionStrategyClassName";

        pulsarClient.newConsumer(schema)
                .topic(topic)
                .subscriptionName("sub1")
                .readCompacted(true)
                .subscribe().close();

        var tv = pulsar.getClient().newTableViewBuilder(schema)
                .topic(topic)
                .subscriptionName("slowTV")
                .loadConf(Map.of(
                        strategyClassName,
                        ServiceUnitStateCompactionStrategy.class.getName()))
                .create();

        // Configure retention to ensue data is retained for reader
        admin.namespaces().setRetention("my-property/use/my-ns",
                new RetentionPolicies(-1, -1));

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
                .topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);

        var reader = ((CompletableFuture<ReaderImpl<ServiceUnitStateData>>) FieldUtils
                .readDeclaredField(tv, "reader", true)).get();
        var consumer = spy(reader.getConsumer());
        FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);
        String bundle = "bundle1";
        final AtomicInteger versionId = new AtomicInteger(0);
        final AtomicInteger cnt = new AtomicInteger(1);
        int msgAddCount = 1000; // has to be big enough to cover compacted cursor fast-forward.
        doAnswer(invocationOnMock -> {
            if (cnt.decrementAndGet() == 0) {
                var msg = consumer.receiveAsync();
                for (int i = 0; i < msgAddCount; i++) {
                    producer.newMessage().key(bundle).value(
                            new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
                                    versionId.get())).send();
                }
                compactor.compact(topic, strategy).join();
                return msg;
            }
            // Call the real method
            reset(consumer);
            return consumer.receiveAsync();
        }).when(consumer).receiveAsync();
        producer.newMessage().key(bundle).value(
                new ServiceUnitStateData(Owned, "broker", true,
                        versionId.incrementAndGet())).send();
        producer.newMessage().key(bundle).value(
                new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
                        versionId.get())).send();
        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
                () -> {
                    var val = tv.get(bundle);
                    assertNotNull(val);
                    assertEquals(val.dstBroker(), "broker" + versionId.get());
                }
        );

        producer.close();
        tv.close();
    }

    @Test
    public void testBrokerRestartAfterCompaction() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
                .topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .create();
        String key = "key0";
        pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();

        var testValues =  Arrays.asList(
                testValue("content0"), testValue("content1"), testValue("content2"));
        for (var val : testValues) {
            producer.newMessage().key(key).value(val).send();
        }
        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            Message<ServiceUnitStateData> m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(testValues.size() - 1));
        }

        stopBroker();
        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            consumer.receive();
            Assert.fail("Shouldn't have been able to receive anything");
        } catch (PulsarClientException e) {
            // correct behaviour
        }
        startBroker();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            Message<ServiceUnitStateData> m = consumer.receive();
            Assert.assertEquals(m.getKey(), key);
            Assert.assertEquals(m.getValue(), testValues.get(testValues.size() - 1));
        }
    }

    @Test
    public void testCompactEmptyTopic() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
                .topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .create();

        pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        var testValue =  testValue( "content0");
        producer.newMessage().key("key0").value(testValue).send();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            Message<ServiceUnitStateData> m = consumer.receive();
            Assert.assertEquals(m.getKey(), "key0");
            Assert.assertEquals(m.getValue(), testValue);
        }
    }

    @Test
    public void testWholeBatchCompactedOut() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";

        // subscribe before sending anything, so that we get all messages
        pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe().close();

        try (Producer<ServiceUnitStateData> producerNormal = pulsarClient.newProducer(schema).topic(topic)
                .enableBatching(true)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();
             Producer<ServiceUnitStateData> producerBatch = pulsarClient.newProducer(schema).topic(topic)
                     .maxPendingMessages(3)
                     .enableBatching(true)
                     .batchingMaxMessages(3)
                     .batchingMaxPublishDelay(1, TimeUnit.HOURS)
                     .messageRoutingMode(MessageRoutingMode.SinglePartition)
                     .create()) {
            producerBatch.newMessage().key("key1").value(testValue("my-message-1")).sendAsync();
            producerBatch.newMessage().key("key1").value(testValue( "my-message-2")).sendAsync();
            producerBatch.newMessage().key("key1").value(testValue("my-message-3")).sendAsync();
            producerNormal.newMessage().key("key1").value(testValue( "my-message-4")).send();
        }

        // compact the topic
        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic)
                .subscriptionName("sub1").readCompacted(true).subscribe()) {
            Message<ServiceUnitStateData> message = consumer.receive();
            Assert.assertEquals(message.getKey(), "key1");
            Assert.assertEquals(new String(message.getValue().dstBroker()), "my-message-4");
        }
    }

    public void testCompactionWithLastDeletedKey() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema).topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .messageRoutingMode(MessageRoutingMode.SinglePartition).create();

        pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();

        producer.newMessage().key("1").value(testValue("1")).send();
        producer.newMessage().key("2").value(testValue("3")).send();
        producer.newMessage().key("3").value(testValue( "5")).send();
        producer.newMessage().key("1").value(null).send();
        producer.newMessage().key("2").value(null).send();

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        Set<String> expected = Sets.newHashSet("3");
        // consumer with readCompacted enabled only get compacted entries
        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
            assertTrue(expected.remove(m.getKey()));
        }
    }

    @Test(timeOut = 20000)
    public void testEmptyCompactionLedger() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";

        Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema).topic(topic)
                .compressionType(MSG_COMPRESSION_TYPE)
                .enableBatching(true)
                .messageRoutingMode(MessageRoutingMode.SinglePartition).create();

        pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();

        producer.newMessage().key("1").value(testValue(Owned, "1")).send();
        producer.newMessage().key("2").value(testValue(Owned, "3")).send();
        producer.newMessage().key("1").value(null).send();
        producer.newMessage().key("2").value(null).send();

        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        // consumer with readCompacted enabled only get compacted entries
        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscribe()) {
            Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
            assertNull(m);
        }
    }

    @Test(timeOut = 20000)
    public void testAllEmptyCompactionLedger() throws Exception {
        final String topic =
                "persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString();

        final int messages = 10;

        // 1.create producer and publish message to the topic.
        ProducerBuilder<ServiceUnitStateData> builder = pulsarClient.newProducer(schema)
                .compressionType(MSG_COMPRESSION_TYPE).topic(topic);
        builder.batchingMaxMessages(messages / 5);

        Producer<ServiceUnitStateData> producer = builder.create();

        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
        for (int i = 0; i < messages; i++) {
            futures.add(producer.newMessage().key("1").value(null).sendAsync());
        }

        FutureUtil.waitForAll(futures).get();

        // 2.compact the topic.
        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        // consumer with readCompacted enabled only get compacted entries
        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
            Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
            assertNull(m);
        }
    }

    @Test(timeOut = 20000)
    public void testCompactMultipleTimesWithoutEmptyMessage()
            throws PulsarClientException, ExecutionException, InterruptedException {
        final String topic =
                "persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID()
                        .toString();

        final int messages = 10;
        final String key = "1";

        // 1.create producer and publish message to the topic.
        ProducerBuilder<ServiceUnitStateData> builder = pulsarClient.newProducer(schema).topic(topic);
        builder.compressionType(MSG_COMPRESSION_TYPE);
        builder.enableBatching(true);


        Producer<ServiceUnitStateData> producer = builder.create();

        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
        for (int i = 0; i < messages; i++) {
            futures.add(producer.newMessage().key(key).value(testValue((i + ""))).sendAsync());
        }

        FutureUtil.waitForAll(futures).get();

        // 2.compact the topic.
        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        // 3. Send more ten messages
        futures.clear();
        for (int i = 0; i < messages; i++) {
            futures.add(producer.newMessage().key(key).value(testValue((i + 10 + ""))).sendAsync());
        }
        FutureUtil.waitForAll(futures).get();

        // 4.compact again.
        compactor.compact(topic, strategy).get();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
            Message<ServiceUnitStateData> m1 = consumer.receive();
            assertNotNull(m1);
            assertEquals(m1.getKey(), key);
            assertEquals(m1.getValue().dstBroker(), "19");
            Message<ServiceUnitStateData> none = consumer.receive(2, TimeUnit.SECONDS);
            assertNull(none);
        }
    }

    @Test(timeOut = 200000)
    public void testReadUnCompacted()
            throws PulsarClientException, ExecutionException, InterruptedException {
        final String topic = "persistent://my-property/use/my-ns/testReadUnCompacted" + UUID.randomUUID().toString();

        final int messages = 10;
        final String key = "1";

        // 1.create producer and publish message to the topic.
        ProducerBuilder<ServiceUnitStateData> builder = pulsarClient.newProducer(schema).topic(topic);
        builder.compressionType(MSG_COMPRESSION_TYPE);
        builder.batchingMaxMessages(messages / 5);

        Producer<ServiceUnitStateData> producer = builder.create();

        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
        for (int i = 0; i < messages; i++) {
            futures.add(producer.newMessage().key(key).value(testValue((i + ""))).sendAsync());
        }

        FutureUtil.waitForAll(futures).get();

        // 2.compact the topic.
        StrategicTwoPhaseCompactor compactor
                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
        compactor.compact(topic, strategy).get();

        // 3. Send more ten messages
        futures.clear();
        for (int i = 0; i < messages; i++) {
            futures.add(producer.newMessage().key(key).value(testValue((i + 10 + ""))).sendAsync());
        }
        FutureUtil.waitForAll(futures).get();
        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema)
                .topic(topic)
                .subscriptionName("sub1")
                .readCompacted(true)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe()) {
            for (int i = 0; i < 11; i++) {
                Message<ServiceUnitStateData> received = consumer.receive();
                assertNotNull(received);
                assertEquals(received.getKey(), key);
                assertEquals(received.getValue().dstBroker(), i + 9 + "");
                consumer.acknowledge(received);
            }
            Message<ServiceUnitStateData> none = consumer.receive(2, TimeUnit.SECONDS);
            assertNull(none);
        }

        // 4.Send empty message to delete the key-value in the compacted topic.
        producer.newMessage().key(key).value(null).send();

        // 5.compact the topic.
        compactor.compact(topic, strategy).get();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema)
                .topic(topic)
                .subscriptionName("sub2")
                .readCompacted(true)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe()) {
            Message<ServiceUnitStateData> none = consumer.receive(2, TimeUnit.SECONDS);
            assertNull(none);
        }

        for (int i = 0; i < messages; i++) {
            futures.add(producer.newMessage().key(key).value(testValue((i + 20 + ""))).sendAsync());
        }
        FutureUtil.waitForAll(futures).get();

        try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema)
                .topic(topic)
                .subscriptionName("sub3")
                .readCompacted(true)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe()) {
            for (int i = 0; i < 10; i++) {
                Message<ServiceUnitStateData> received = consumer.receive();
                assertNotNull(received);
                assertEquals(received.getKey(), key);
                assertEquals(received.getValue().dstBroker(), i + 20 + "");
                consumer.acknowledge(received);
            }
            Message<ServiceUnitStateData> none = consumer.receive(2, TimeUnit.SECONDS);
            assertNull(none);
        }
    }

    public static long versionId(ServiceUnitStateData data) {
        return data == null ? ServiceUnitStateChannelImpl.VERSION_ID_INIT - 1 : data.versionId();
    }
}
