/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.loadbalance.extensions.channel;

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Deleted;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Init;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Assign;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Split;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {

    private PulsarService pulsar1;
    private PulsarService pulsar2;
    private ServiceUnitStateChannel channel1;
    private ServiceUnitStateChannel channel2;
    private String lookupServiceAddress1;
    private String lookupServiceAddress2;
    private String bundle;
    private String bundle1;
    private String bundle2;
    private String bundle3;
    private String childBundle1Range;
    private String childBundle2Range;
    private String childBundle11;
    private String childBundle12;

    private String childBundle31;
    private String childBundle32;
    private PulsarTestContext additionalPulsarTestContext;

    private LoadManagerContext loadManagerContext;

    private BrokerRegistryImpl registry;

    private ExtensibleLoadManagerImpl loadManager;

    @BeforeClass
    @Override
    protected void setup() throws Exception {
        conf.setAllowAutoTopicCreation(true);
        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
        conf.setLoadBalancerDebugModeEnabled(true);
        conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
        super.internalSetup(conf);

        admin.tenants().createTenant("pulsar", createDefaultTenantInfo());
        admin.namespaces().createNamespace("pulsar/system");
        admin.tenants().createTenant("public", createDefaultTenantInfo());
        admin.namespaces().createNamespace("public/default");

        pulsar1 = pulsar;
        registry = new BrokerRegistryImpl(pulsar);
        loadManagerContext = mock(LoadManagerContext.class);
        doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore();
        doReturn(mock(LoadDataStore.class)).when(loadManagerContext).topBundleLoadDataStore();
        loadManager = mock(ExtensibleLoadManagerImpl.class);
        additionalPulsarTestContext = createAdditionalPulsarTestContext(getDefaultConf());
        pulsar2 = additionalPulsarTestContext.getPulsarService();

        channel1 = createChannel(pulsar1);
        channel1.start();

        channel2 = createChannel(pulsar2);
        channel2.start();
        lookupServiceAddress1 = (String)
                FieldUtils.readDeclaredField(channel1, "lookupServiceAddress", true);
        lookupServiceAddress2 = (String)
                FieldUtils.readDeclaredField(channel2, "lookupServiceAddress", true);

        bundle = "public/default/0x00000000_0xffffffff";
        bundle1 = "public/default/0x00000000_0xfffffff0";
        bundle2 = "public/default/0xfffffff0_0xffffffff";
        bundle3 = "public/default3/0x00000000_0xffffffff";
        childBundle1Range = "0x7fffffff_0xffffffff";
        childBundle2Range = "0x00000000_0x7fffffff";

        childBundle11 = "public/default/" + childBundle1Range;
        childBundle12 = "public/default/" + childBundle2Range;

        childBundle31 = "public/default3/" + childBundle1Range;
        childBundle32 = "public/default3/" + childBundle2Range;
    }

    @BeforeMethod
    protected void initChannels() throws Exception {
        cleanTableViews();
        cleanOwnershipMonitorCounters(channel1);
        cleanOwnershipMonitorCounters(channel2);
        cleanOpsCounters(channel1);
        cleanOpsCounters(channel2);
        cleanMetadataState(channel1);
        cleanMetadataState(channel2);
    }


    @AfterClass
    @Override
    protected void cleanup() throws Exception {
        channel1.close();
        channel2.close();
        if (additionalPulsarTestContext != null) {
            additionalPulsarTestContext.close();
            additionalPulsarTestContext = null;
        }
        pulsar1 = null;
        pulsar2 = null;
        super.internalCleanup();
    }

    @Test(priority = -1)
    public void channelOwnerTest() throws Exception {
        var channelOwner1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
        var channelOwner2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
        assertEquals(channelOwner1, channelOwner2);
        LeaderElectionService leaderElectionService1 = (LeaderElectionService) FieldUtils.readDeclaredField(
                channel1, "leaderElectionService", true);
        leaderElectionService1.close();
        waitUntilNewChannelOwner(channel2, channelOwner1);
        leaderElectionService1.start();
        waitUntilNewChannelOwner(channel1, channelOwner1);

        var newChannelOwner1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS);
        var newChannelOwner2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS);

        assertEquals(newChannelOwner1, newChannelOwner2);
        assertNotEquals(channelOwner1, newChannelOwner1);

        if (newChannelOwner1.equals(Optional.of(lookupServiceAddress1))) {
            assertTrue(channel1.isChannelOwnerAsync().get(2, TimeUnit.SECONDS));
            assertFalse(channel2.isChannelOwnerAsync().get(2, TimeUnit.SECONDS));
        } else {
            assertFalse(channel1.isChannelOwnerAsync().get(2, TimeUnit.SECONDS));
            assertTrue(channel2.isChannelOwnerAsync().get(2, TimeUnit.SECONDS));
        }
    }

    @Test(priority = 0)
    public void channelValidationTest()
            throws ExecutionException, InterruptedException, IllegalAccessException, PulsarServerException,
            TimeoutException {
        var channel = createChannel(pulsar);
        int errorCnt = validateChannelStart(channel);
        assertEquals(6, errorCnt);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future startFuture = executor.submit(() -> {
            try {
                channel.start();
            } catch (PulsarServerException e) {
                throw new RuntimeException(e);
            }
        });
        errorCnt = validateChannelStart(channel);
        startFuture.get();
        assertTrue(errorCnt > 0);

        FieldUtils.writeDeclaredField(channel, "channelState",
                ServiceUnitStateChannelImpl.ChannelState.LeaderElectionServiceStarted, true);
        assertNotNull(channel.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get());

        Future closeFuture = executor.submit(()->{
            try {
                channel.close();
            } catch (PulsarServerException e) {
                throw new RuntimeException(e);
            }
        });
        errorCnt = validateChannelStart(channel);
        closeFuture.get();
        assertTrue(errorCnt > 0);
        errorCnt = validateChannelStart(channel);
        assertEquals(6, errorCnt);

        // check if we can close() again
        channel.close();
        errorCnt = validateChannelStart(channel);
        assertEquals(6, errorCnt);

        // close() -> start() test.
        channel.start();
        assertNotNull(channel.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get());

        // start() -> start() test.
        assertThrows(IllegalStateException.class, () -> channel.start());
        channel.close(); // cleanup
    }

    private int validateChannelStart(ServiceUnitStateChannelImpl channel)
            throws InterruptedException, TimeoutException {
        int errorCnt = 0;
        try {
            channel.isChannelOwnerAsync().get(2, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            if(e.getCause() instanceof IllegalStateException){
                errorCnt++;
            }
        }
        try {
            channel.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
        } catch (ExecutionException e) {
            if (e.getCause() instanceof IllegalStateException) {
                errorCnt++;
            }
        }
        try {
            channel.getOwnerAsync(bundle).get(2, TimeUnit.SECONDS).get();
        } catch (ExecutionException e) {
            if (e.getCause() instanceof IllegalStateException) {
                errorCnt++;
            }
        }
        try {
            channel.publishAssignEventAsync(bundle, lookupServiceAddress1).get(2, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof IllegalStateException) {
                errorCnt++;
            }
        }
        try {
            channel.publishUnloadEventAsync(
                    new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2)))
                    .get(2, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof IllegalStateException) {
                errorCnt++;
            }
        }
        try {
            Split split = new Split(bundle, lookupServiceAddress1, Map.of(
                    childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
            channel.publishSplitEventAsync(split)
                    .get(2, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof IllegalStateException) {
                errorCnt++;
            }
        }
        return errorCnt;
    }

    @Test(priority = 1)
    public void compactionScheduleTest() {

        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(5, TimeUnit.SECONDS)
                .untilAsserted(() -> { // wait until true
                    try {
                        var threshold = admin.topicPolicies()
                                .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false).longValue();
                        assertEquals(5 * 1024 * 1024, threshold);
                    } catch (Exception e) {
                        ;
                    }
                });
    }

    @Test(priority = 2)
    public void assignmentTest()
            throws ExecutionException, InterruptedException, IllegalAccessException, TimeoutException {
        var getOwnerRequests1 = spy(getOwnerRequests((channel1)));
        var getOwnerRequests2 = spy(getOwnerRequests((channel2)));

        var owner1 = channel1.getOwnerAsync(bundle);
        var owner2 = channel2.getOwnerAsync(bundle);

        assertTrue(owner1.get().isEmpty());
        assertTrue(owner2.get().isEmpty());

        var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
        var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
        assertNotNull(assigned1);
        assertNotNull(assigned2);
        waitUntilOwnerChanges(channel1, bundle, null);
        waitUntilOwnerChanges(channel2, bundle, null);
        String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS);
        String assignedAddr2 = assigned2.get(5, TimeUnit.SECONDS);

        assertEquals(assignedAddr1, assignedAddr2);
        assertTrue(assignedAddr1.equals(lookupServiceAddress1)
                || assignedAddr1.equals(lookupServiceAddress2), assignedAddr1);

        var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
        var ownerAddr2 = channel2.getOwnerAsync(bundle).get();

        assertEquals(ownerAddr1, ownerAddr2);
        assertEquals(getOwnerRequests1.size(), 0);
        assertEquals(getOwnerRequests2.size(), 0);

        validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
        validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
        validateEventCounters(channel1, 1, 0, 0, 0, 0, 0);
        validateEventCounters(channel2, 1, 0, 0, 0, 0, 0);
    }

    @Test(priority = 3)
    public void assignmentTestWhenOneAssignmentFails()
            throws ExecutionException, InterruptedException, IllegalAccessException, TimeoutException {
        var getOwnerRequests1 = getOwnerRequests(channel1);
        var getOwnerRequests2 = getOwnerRequests(channel2);
        assertEquals(0, getOwnerRequests1.size());
        assertEquals(0, getOwnerRequests2.size());

        var producer = (Producer<ServiceUnitStateData>) FieldUtils.readDeclaredField(channel1,
                "producer", true);
        var spyProducer = spy(producer);
        var msg = mock(TypedMessageBuilder.class);
        var future = spy(CompletableFuture.failedFuture(new RuntimeException()));
        doReturn(msg).when(spyProducer).newMessage();
        doReturn(msg).when(msg).key(any());
        doReturn(msg).when(msg).value(any());
        doReturn(future).when(msg).sendAsync();

        FieldUtils.writeDeclaredField(channel1, "producer", spyProducer, true);

        var owner1 = channel1.getOwnerAsync(bundle);
        var owner2 = channel2.getOwnerAsync(bundle);

        assertTrue(owner1.get().isEmpty());
        assertTrue(owner2.get().isEmpty());

        var owner3 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
        var owner4 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
        assertTrue(owner3.isCompletedExceptionally());
        assertNotNull(owner4);
        String ownerAddrOpt2 = owner4.get(5, TimeUnit.SECONDS);
        assertEquals(ownerAddrOpt2, lookupServiceAddress2);
        waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
        assertEquals(0, getOwnerRequests1.size());
        assertEquals(0, getOwnerRequests2.size());

        FieldUtils.writeDeclaredField(channel1, "producer", producer, true);
    }

    @Test(priority = 4)
    public void transferTest()
            throws ExecutionException, InterruptedException, TimeoutException, IllegalAccessException {

        var owner1 = channel1.getOwnerAsync(bundle);
        var owner2 = channel2.getOwnerAsync(bundle);

        assertTrue(owner1.get().isEmpty());
        assertTrue(owner2.get().isEmpty());


        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
        var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
        var ownerAddr2 = channel2.getOwnerAsync(bundle).get();

        assertEquals(ownerAddr1, ownerAddr2);
        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));

        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2));
        channel1.publishUnloadEventAsync(unload);

        waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
        waitUntilNewOwner(channel2, bundle, lookupServiceAddress2);

        ownerAddr1 = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
        ownerAddr2 = channel2.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
        assertEquals(ownerAddr1, ownerAddr2);
        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));

        validateHandlerCounters(channel1, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
        validateHandlerCounters(channel2, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
        validateEventCounters(channel1, 1, 0, 0, 0, 1, 0);
        validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
    }

    @Test(priority = 5)
    public void transferTestWhenDestBrokerFails()
            throws ExecutionException, InterruptedException, IllegalAccessException {

        var getOwnerRequests1 = getOwnerRequests(channel1);
        var getOwnerRequests2 = getOwnerRequests(channel2);
        assertEquals(0, getOwnerRequests1.size());
        assertEquals(0, getOwnerRequests2.size());

        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
        var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
        var ownerAddr2 = channel2.getOwnerAsync(bundle).get();

        assertEquals(ownerAddr1, ownerAddr2);
        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));

        var producer = (Producer<ServiceUnitStateData>) FieldUtils.readDeclaredField(channel1,
                "producer", true);
        var spyProducer = spy(producer);
        var msg = mock(TypedMessageBuilder.class);
        var future = CompletableFuture.failedFuture(new RuntimeException());
        doReturn(msg).when(spyProducer).newMessage();
        doReturn(msg).when(msg).key(any());
        doReturn(msg).when(msg).value(any());
        doReturn(future).when(msg).sendAsync();
        FieldUtils.writeDeclaredField(channel2, "producer", spyProducer, true);
        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2));
        channel1.publishUnloadEventAsync(unload);
        // channel1 is broken. the ownership transfer won't be complete.
        waitUntilState(channel1, bundle);
        waitUntilState(channel2, bundle);
        var owner1 = channel1.getOwnerAsync(bundle);
        var owner2 = channel2.getOwnerAsync(bundle);

        assertFalse(owner1.isDone());
        assertFalse(owner2.isDone());

        assertEquals(1, getOwnerRequests1.size());
        assertEquals(1, getOwnerRequests2.size());

        // In 5 secs, the getOwnerAsync requests(lookup requests) should time out.
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally()));
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally()));

        assertEquals(0, getOwnerRequests1.size());
        assertEquals(0, getOwnerRequests2.size());

        // recovered, check the monitor update state : Assigned -> Owned
        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
                .when(loadManager).selectAsync(any(), any());
        FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 1 , true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 1 , true);

        ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));
        ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));


        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
        ownerAddr1 = channel1.getOwnerAsync(bundle).get();
        ownerAddr2 = channel2.getOwnerAsync(bundle).get();

        assertEquals(ownerAddr1, ownerAddr2);
        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));

        var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
        validateMonitorCounters(leader,
                0,
                0,
                1,
                0,
                0,
                0,
                0);

        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);

    }

    @Test(priority = 6)
    public void splitAndRetryTest() throws Exception {
        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
        var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
        var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
        assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
        assertTrue(ownerAddr1.isPresent());

        NamespaceService namespaceService = spy(pulsar1.getNamespaceService());
        CompletableFuture<Void> future = new CompletableFuture<>();
        int badVersionExceptionCount = 3;
        AtomicInteger count = new AtomicInteger(badVersionExceptionCount);
        future.completeExceptionally(new MetadataStoreException.BadVersionException("BadVersion"));
        doAnswer(invocationOnMock -> {
            if (count.decrementAndGet() > 0) {
                return future;
            }
            // Call the real method
            reset(namespaceService);
            doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", "test-topic-2")))
                    .when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
            return future;
        }).when(namespaceService).updateNamespaceBundles(any(), any());
        doReturn(namespaceService).when(pulsar1).getNamespaceService();
        doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", "test-topic-2")))
                .when(namespaceService).getOwnedTopicListForNamespaceBundle(any());

        // Assert child bundle ownerships in the channels.

        Split split = new Split(bundle, ownerAddr1.get(), Map.of(
                childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
        channel1.publishSplitEventAsync(split);

        waitUntilState(channel1, bundle, Deleted);
        waitUntilState(channel2, bundle, Deleted);

        validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
        validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
        validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
        validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
        // Verify the retry count
        verify(((ServiceUnitStateChannelImpl) channel1), times(badVersionExceptionCount + 1))
                .splitServiceUnitOnceAndRetry(any(), any(), any(), any(), any(), any(), any(), any(), anyLong(), any());



        waitUntilNewOwner(channel1, childBundle11, lookupServiceAddress1);
        waitUntilNewOwner(channel1, childBundle12, lookupServiceAddress1);
        waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress1);
        waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress1);
        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle11).get());
        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle12).get());
        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle11).get());
        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle12).get());


        // try the monitor and check the monitor moves `Deleted` -> `Init`
        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 1 , true);
        FieldUtils.writeDeclaredField(channel1,
                "semiTerminalStateWaitingTimeInMillis", 1, true);

        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 1 , true);
        FieldUtils.writeDeclaredField(channel2,
                "semiTerminalStateWaitingTimeInMillis", 1, true);

        ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));
        ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));
        waitUntilState(channel1, bundle, Init);
        waitUntilState(channel2, bundle, Init);

        var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
        validateMonitorCounters(leader,
                0,
                3,
                0,
                0,
                0,
                0,
                0);

        cleanTableView(channel1, childBundle11);
        cleanTableView(channel2, childBundle11);
        cleanTableView(channel1, childBundle12);
        cleanTableView(channel2, childBundle12);

        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        FieldUtils.writeDeclaredField(channel1,
                "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);

        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);
    }

    @Test(priority = 7)
    public void handleMetadataSessionEventTest() throws IllegalAccessException {
        var ts = System.currentTimeMillis();
        channel1.handleMetadataSessionEvent(SessionReestablished);
        var lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
        var lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);

        assertEquals(SessionReestablished, lastMetadataSessionEvent);
        assertThat(lastMetadataSessionEventTimestamp,
                greaterThanOrEqualTo(ts));

        ts = System.currentTimeMillis();
        channel1.handleMetadataSessionEvent(SessionLost);
        lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
        lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);

        assertEquals(SessionLost, lastMetadataSessionEvent);
        assertThat(lastMetadataSessionEventTimestamp,
                greaterThanOrEqualTo(ts));

        ts = System.currentTimeMillis();
        channel1.handleMetadataSessionEvent(ConnectionLost);
        lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
        lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);

        assertEquals(SessionLost, lastMetadataSessionEvent);
        assertThat(lastMetadataSessionEventTimestamp,
                lessThanOrEqualTo(ts));

        ts = System.currentTimeMillis();
        channel1.handleMetadataSessionEvent(Reconnected);
        lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
        lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);

        assertEquals(SessionLost, lastMetadataSessionEvent);
        assertThat(lastMetadataSessionEventTimestamp,
                lessThanOrEqualTo(ts));

    }

    @Test(priority = 8)
    public void handleBrokerCreationEventTest() throws IllegalAccessException {
        var cleanupJobs = getCleanupJobs(channel1);
        String broker = "broker-1";
        var future = new CompletableFuture();
        cleanupJobs.put(broker, future);
        channel1.handleBrokerRegistrationEvent(broker, NotificationType.Created);
        assertEquals(0, cleanupJobs.size());
        assertTrue(future.isCancelled());
    }

    @Test(priority = 9)
    public void handleBrokerDeletionEventTest()
            throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {

        var cleanupJobs1 = getCleanupJobs(channel1);
        var cleanupJobs2 = getCleanupJobs(channel2);
        var leaderCleanupJobsTmp = spy(cleanupJobs1);
        var followerCleanupJobsTmp = spy(cleanupJobs2);
        var leaderChannel = channel1;
        var followerChannel = channel2;
        String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
        String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
        assertEquals(leader, leader2);
        if (leader.equals(lookupServiceAddress2)) {
            leaderChannel = channel2;
            followerChannel = channel1;
            var tmp = followerCleanupJobsTmp;
            followerCleanupJobsTmp = leaderCleanupJobsTmp;
            leaderCleanupJobsTmp = tmp;
        }
        final var leaderCleanupJobs = leaderCleanupJobsTmp;
        final var followerCleanupJobs = followerCleanupJobsTmp;
        FieldUtils.writeDeclaredField(leaderChannel, "cleanupJobs", leaderCleanupJobs,
                true);
        FieldUtils.writeDeclaredField(followerChannel, "cleanupJobs", followerCleanupJobs,
                true);

        var owner1 = channel1.getOwnerAsync(bundle1);
        var owner2 = channel2.getOwnerAsync(bundle2);
        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
                .when(loadManager).selectAsync(any(), any());
        assertTrue(owner1.get().isEmpty());
        assertTrue(owner2.get().isEmpty());

        String broker = lookupServiceAddress1;
        channel1.publishAssignEventAsync(bundle1, broker);
        channel2.publishAssignEventAsync(bundle2, broker);

        waitUntilNewOwner(channel1, bundle1, broker);
        waitUntilNewOwner(channel2, bundle1, broker);
        waitUntilNewOwner(channel1, bundle2, broker);
        waitUntilNewOwner(channel2, bundle2, broker);

        // Verify to transfer the ownership to the other broker.
        channel1.publishUnloadEventAsync(new Unload(broker, bundle1, Optional.of(lookupServiceAddress2)));
        waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
        waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);

        // test stable metadata state
        leaderChannel.handleMetadataSessionEvent(SessionReestablished);
        followerChannel.handleMetadataSessionEvent(SessionReestablished);
        FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
                System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
        FieldUtils.writeDeclaredField(followerChannel, "lastMetadataSessionEventTimestamp",
                System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
        leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
        followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
        leaderChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, NotificationType.Deleted);
        followerChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, NotificationType.Deleted);

        waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
        waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
        waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
        waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);

        verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
        verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());

        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
            assertEquals(0, leaderCleanupJobs.size());
            assertEquals(0, followerCleanupJobs.size());
        });

        validateMonitorCounters(leaderChannel,
                2,
                0,
                3,
                0,
                2,
                0,
                0);


        // test jittery metadata state
        channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle1, Optional.of(broker)));
        channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle2, Optional.of(broker)));
        waitUntilNewOwner(channel1, bundle1, broker);
        waitUntilNewOwner(channel2, bundle1, broker);
        waitUntilNewOwner(channel1, bundle2, broker);
        waitUntilNewOwner(channel2, bundle2, broker);

        leaderChannel.handleMetadataSessionEvent(SessionReestablished);
        followerChannel.handleMetadataSessionEvent(SessionReestablished);
        leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
        followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

        verify(leaderCleanupJobs, times(2)).computeIfAbsent(eq(broker), any());
        verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());

        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
            assertEquals(1, leaderCleanupJobs.size());
            assertEquals(0, followerCleanupJobs.size());
        });

        validateMonitorCounters(leaderChannel,
                2,
                0,
                3,
                0,
                3,
                0,
                0);

        // broker is back online
        leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Created);
        followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Created);

        verify(leaderCleanupJobs, times(2)).computeIfAbsent(eq(broker), any());
        verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());

        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
            assertEquals(0, leaderCleanupJobs.size());
            assertEquals(0, followerCleanupJobs.size());
        });

        validateMonitorCounters(leaderChannel,
                2,
                0,
                3,
                0,
                3,
                0,
                1);


        // broker is offline again
        FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3, true);
        leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
        followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

        verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
        verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());
        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
            assertEquals(1, leaderCleanupJobs.size());
            assertEquals(0, followerCleanupJobs.size());
        });

        validateMonitorCounters(leaderChannel,
                2,
                0,
                3,
                0,
                4,
                0,
                1);

        // finally cleanup
        waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
        waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
        waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
        waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);

        verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
        verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());
        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
            assertEquals(0, leaderCleanupJobs.size());
            assertEquals(0, followerCleanupJobs.size());
        });

        validateMonitorCounters(leaderChannel,
                3,
                0,
                5,
                0,
                4,
                0,
                1);

        // test unstable state
        channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle1, Optional.of(broker)));
        channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle2, Optional.of(broker)));
        waitUntilNewOwner(channel1, bundle1, broker);
        waitUntilNewOwner(channel2, bundle1, broker);
        waitUntilNewOwner(channel1, bundle2, broker);
        waitUntilNewOwner(channel2, bundle2, broker);

        leaderChannel.handleMetadataSessionEvent(SessionLost);
        followerChannel.handleMetadataSessionEvent(SessionLost);
        leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
        followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

        verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
        verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());
        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
            assertEquals(0, leaderCleanupJobs.size());
            assertEquals(0, followerCleanupJobs.size());
        });

        validateMonitorCounters(leaderChannel,
                3,
                0,
                5,
                0,
                4,
                1,
                1);

        // clean-up
        FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true);
        FieldUtils.writeDeclaredField(channel1, "cleanupJobs", cleanupJobs1,
                true);
        FieldUtils.writeDeclaredField(channel2, "cleanupJobs", cleanupJobs2,
                true);
    }

    @Test(priority = 10)
    public void conflictAndCompactionTest() throws ExecutionException, InterruptedException, TimeoutException,
            IllegalAccessException, PulsarClientException, PulsarServerException {
        String bundle = String.format("%s/%s", "public/default", "0x0000000a_0xffffffff");
        var owner1 = channel1.getOwnerAsync(bundle);
        var owner2 = channel2.getOwnerAsync(bundle);
        assertTrue(owner1.get().isEmpty());
        assertTrue(owner2.get().isEmpty());

        var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
        assertNotNull(assigned1);

        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
        String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS);
        assertEquals(lookupServiceAddress1, assignedAddr1);

        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
        var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
        assertNotNull(assigned2);
        Exception ex = null;
        try {
            assigned2.join();
        } catch (CompletionException e) {
            ex = e;
        }
        assertNull(ex);
        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(bundle).get());
        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(bundle).get());

        var compactor = spy (pulsar1.getStrategicCompactor());
        Field strategicCompactorField = FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true);
        FieldUtils.writeField(strategicCompactorField, pulsar1, compactor, true);
        FieldUtils.writeField(strategicCompactorField, pulsar2, compactor, true);
        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(140, TimeUnit.SECONDS)
                .untilAsserted(() -> {
                    channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
                    verify(compactor, times(1))
                            .compact(eq(ServiceUnitStateChannelImpl.TOPIC), any());
                });


        var channel3 = createChannel(pulsar);
        channel3.start();
        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(5, TimeUnit.SECONDS)
                .untilAsserted(() -> assertEquals(
                        channel3.getOwnerAsync(bundle).get(), Optional.of(lookupServiceAddress1)));
        channel3.close();
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
    }

    @Test(priority = 11)
    public void ownerLookupCountTests() throws IllegalAccessException {

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, "b1", 1));
        channel1.getOwnerAsync(bundle);
        channel1.getOwnerAsync(bundle);

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, "b1", 1));
        channel1.getOwnerAsync(bundle);
        channel1.getOwnerAsync(bundle);
        channel1.getOwnerAsync(bundle);

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, "b1", 1));
        channel1.getOwnerAsync(bundle);
        channel1.getOwnerAsync(bundle);

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, "b1", 1));
        channel1.getOwnerAsync(bundle);

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, "b1", 1));
        channel1.getOwnerAsync(bundle);

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, "b1", 1));
        channel1.getOwnerAsync(bundle);
        channel1.getOwnerAsync(bundle);

        overrideTableView(channel1, bundle, null);
        channel1.getOwnerAsync(bundle);
        channel1.getOwnerAsync(bundle);
        channel1.getOwnerAsync(bundle);

        validateOwnerLookUpCounters(channel1, 2, 3, 2, 1, 1, 2, 3);

    }

    @Test(priority = 12)
    public void unloadTest()
            throws ExecutionException, InterruptedException, IllegalAccessException {

        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);

        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
        var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
        var ownerAddr2 = channel2.getOwnerAsync(bundle).get();

        assertEquals(ownerAddr1, ownerAddr2);
        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty());

        channel1.publishUnloadEventAsync(unload);

        waitUntilState(channel1, bundle, Free);
        waitUntilState(channel2, bundle, Free);
        var owner1 = channel1.getOwnerAsync(bundle);
        var owner2 = channel2.getOwnerAsync(bundle);

        assertEquals(Optional.empty(), owner1.get());
        assertEquals(Optional.empty(), owner2.get());

        channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);

        waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
        waitUntilNewOwner(channel2, bundle, lookupServiceAddress2);

        ownerAddr1 = channel1.getOwnerAsync(bundle).get();
        ownerAddr2 = channel2.getOwnerAsync(bundle).get();

        assertEquals(ownerAddr1, ownerAddr2);
        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
        Unload unload2 = new Unload(lookupServiceAddress2, bundle, Optional.empty());

        channel2.publishUnloadEventAsync(unload2);

        waitUntilState(channel1, bundle, Free);
        waitUntilState(channel2, bundle, Free);

        // test monitor if Free -> Init
        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 1 , true);
        FieldUtils.writeDeclaredField(channel1,
                "semiTerminalStateWaitingTimeInMillis", 1, true);

        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 1 , true);
        FieldUtils.writeDeclaredField(channel2,
                "semiTerminalStateWaitingTimeInMillis", 1, true);

        ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));
        ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));
        waitUntilState(channel1, bundle, Init);
        waitUntilState(channel2, bundle, Init);

        var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
        validateMonitorCounters(leader,
                0,
                1,
                0,
                0,
                0,
                0,
                0);


        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        FieldUtils.writeDeclaredField(channel1,
                "semiTerminalStateWaitingTimeInMillis", 30 * 1000, true);

        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 300 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);
    }

    @Test(priority = 13)
    public void assignTestWhenDestBrokerProducerFails()
            throws ExecutionException, InterruptedException, IllegalAccessException {

        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty());

        channel1.publishUnloadEventAsync(unload);

        waitUntilState(channel1, bundle, Free);
        waitUntilState(channel2, bundle, Free);

        assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get());
        assertEquals(Optional.empty(), channel2.getOwnerAsync(bundle).get());

        var producer = (Producer<ServiceUnitStateData>) FieldUtils.readDeclaredField(channel1,
                "producer", true);
        var spyProducer = spy(producer);
        var msg = mock(TypedMessageBuilder.class);
        var future = CompletableFuture.failedFuture(new RuntimeException());
        doReturn(msg).when(spyProducer).newMessage();
        doReturn(msg).when(msg).key(any());
        doReturn(msg).when(msg).value(any());
        doReturn(future).when(msg).sendAsync();
        FieldUtils.writeDeclaredField(channel2, "producer", spyProducer, true);
        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
                .when(loadManager).selectAsync(any(), any());
        channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
        // channel1 is broken. the assign won't be complete.
        waitUntilState(channel1, bundle);
        waitUntilState(channel2, bundle);
        var owner1 = channel1.getOwnerAsync(bundle);
        var owner2 = channel2.getOwnerAsync(bundle);

        assertFalse(owner1.isDone());
        assertFalse(owner2.isDone());

        // In 5 secs, the getOwnerAsync requests(lookup requests) should time out.
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally()));
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally()));

        // recovered, check the monitor update state : Assigned -> Owned
        FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 1 , true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 1 , true);

        ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));
        ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));


        waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
        waitUntilNewOwner(channel2, bundle, lookupServiceAddress2);
        var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
        var ownerAddr2 = channel2.getOwnerAsync(bundle).get();

        assertEquals(ownerAddr1, ownerAddr2);
        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));

        var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
        validateMonitorCounters(leader,
                0,
                0,
                1,
                0,
                0,
                0,
                0);

        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);

    }

    @Test(priority = 14)
    public void splitTestWhenProducerFails()
            throws ExecutionException, InterruptedException, IllegalAccessException {


        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty());

        channel1.publishUnloadEventAsync(unload);

        waitUntilState(channel1, bundle, Free);
        waitUntilState(channel2, bundle, Free);

        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);

        waitUntilState(channel1, bundle, Owned);
        waitUntilState(channel2, bundle, Owned);

        assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
        assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(bundle).get().get());

        var producer = (Producer<ServiceUnitStateData>) FieldUtils.readDeclaredField(channel1,
                "producer", true);
        var spyProducer = spy(producer);
        var msg = mock(TypedMessageBuilder.class);
        var future = CompletableFuture.failedFuture(new RuntimeException());
        doReturn(msg).when(spyProducer).newMessage();
        doReturn(msg).when(msg).key(any());
        doReturn(msg).when(msg).value(any());
        doReturn(future).when(msg).sendAsync();
        FieldUtils.writeDeclaredField(channel1, "producer", spyProducer, true);
        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
        // Assert child bundle ownerships in the channels.


        Split split = new Split(bundle, lookupServiceAddress1, Map.of(
                childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
        channel2.publishSplitEventAsync(split);
        // channel1 is broken. the split won't be complete.
        waitUntilState(channel1, bundle);
        waitUntilState(channel2, bundle);
        var owner1 = channel1.getOwnerAsync(bundle);
        var owner2 = channel2.getOwnerAsync(bundle);


        // recovered, check the monitor update state : Splitting -> Owned
        FieldUtils.writeDeclaredField(channel1, "producer", producer, true);
        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 1 , true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 1 , true);


        var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;

        waitUntilStateWithMonitor(leader, bundle, Deleted);
        waitUntilStateWithMonitor(channel1, bundle, Deleted);
        waitUntilStateWithMonitor(channel2, bundle, Deleted);

        var ownerAddr1 = channel1.getOwnerAsync(bundle);
        var ownerAddr2 = channel2.getOwnerAsync(bundle);

        assertTrue(ownerAddr1.isCompletedExceptionally());
        assertTrue(ownerAddr2.isCompletedExceptionally());


        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);

    }

    @Test(priority = 15)
    public void testIsOwner() throws IllegalAccessException {

        var owner1 = channel1.isOwner(bundle);
        var owner2 = channel2.isOwner(bundle);

        assertFalse(owner1);
        assertFalse(owner2);

        owner1 = channel1.isOwner(bundle, lookupServiceAddress2);
        owner2 = channel2.isOwner(bundle, lookupServiceAddress1);

        assertFalse(owner1);
        assertFalse(owner2);

        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
        owner2 = channel2.isOwner(bundle);
        assertFalse(owner2);

        waitUntilOwnerChanges(channel1, bundle, null);
        waitUntilOwnerChanges(channel2, bundle, null);

        owner1 = channel1.isOwner(bundle);
        owner2 = channel2.isOwner(bundle);

        assertTrue(owner1);
        assertFalse(owner2);

        owner1 = channel1.isOwner(bundle, lookupServiceAddress1);
        owner2 = channel2.isOwner(bundle, lookupServiceAddress2);

        assertTrue(owner1);
        assertFalse(owner2);

        owner1 = channel2.isOwner(bundle, lookupServiceAddress1);
        owner2 = channel1.isOwner(bundle, lookupServiceAddress2);

        assertTrue(owner1);
        assertFalse(owner2);

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, lookupServiceAddress1, 1));
        assertFalse(channel1.isOwner(bundle));

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, lookupServiceAddress1, 1));
        assertTrue(channel1.isOwner(bundle));

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, null, lookupServiceAddress1, 1));
        assertFalse(channel1.isOwner(bundle));

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, lookupServiceAddress1, 1));
        assertTrue(channel1.isOwner(bundle));

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, null, lookupServiceAddress1, 1));
        assertFalse(channel1.isOwner(bundle));

        overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, null, lookupServiceAddress1, 1));
        assertFalse(channel1.isOwner(bundle));

        overrideTableView(channel1, bundle, null);
        assertFalse(channel1.isOwner(bundle));
    }

    @Test(priority = 16)
    public void splitAndRetryFailureTest() throws Exception {
        channel1.publishAssignEventAsync(bundle3, lookupServiceAddress1);
        waitUntilNewOwner(channel1, bundle3, lookupServiceAddress1);
        waitUntilNewOwner(channel2, bundle3, lookupServiceAddress1);
        var ownerAddr1 = channel1.getOwnerAsync(bundle3).get();
        var ownerAddr2 = channel2.getOwnerAsync(bundle3).get();
        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
        assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
        assertTrue(ownerAddr1.isPresent());

        NamespaceService namespaceService = spy(pulsar1.getNamespaceService());
        CompletableFuture<Void> future = new CompletableFuture<>();
        int badVersionExceptionCount = 10;
        AtomicInteger count = new AtomicInteger(badVersionExceptionCount);
        future.completeExceptionally(new MetadataStoreException.BadVersionException("BadVersion"));
        doAnswer(invocationOnMock -> {
            if (count.decrementAndGet() > 0) {
                return future;
            }
            // Call the real method
            reset(namespaceService);
            doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", "test-topic-2")))
                    .when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
            return future;
        }).when(namespaceService).updateNamespaceBundlesForPolicies(any(), any());
        doReturn(namespaceService).when(pulsar1).getNamespaceService();
        doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", "test-topic-2")))
                .when(namespaceService).getOwnedTopicListForNamespaceBundle(any());

        // Assert child bundle ownerships in the channels.

        Split split = new Split(bundle3, ownerAddr1.get(), Map.of(
                childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
        channel1.publishSplitEventAsync(split);

        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 1 , true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 1 , true);

        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(() -> {
                    assertEquals(3, count.get());
                });
        var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
        ((ServiceUnitStateChannelImpl) leader)
                .monitorOwnerships(List.of(lookupServiceAddress1, lookupServiceAddress2));
        waitUntilState(leader, bundle3, Deleted);
        waitUntilState(channel1, bundle3, Deleted);
        waitUntilState(channel2, bundle3, Deleted);


        validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 1, 0);
        validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0);
        validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
        validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);

        waitUntilNewOwner(channel1, childBundle31, lookupServiceAddress1);
        waitUntilNewOwner(channel1, childBundle32, lookupServiceAddress1);
        waitUntilNewOwner(channel2, childBundle31, lookupServiceAddress1);
        waitUntilNewOwner(channel2, childBundle32, lookupServiceAddress1);
        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle31).get());
        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle32).get());
        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle31).get());
        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle32).get());


        // try the monitor and check the monitor moves `Deleted` -> `Init`

        FieldUtils.writeDeclaredField(channel1,
                "semiTerminalStateWaitingTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(channel2,
                "semiTerminalStateWaitingTimeInMillis", 1, true);

        ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));
        ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
                List.of(lookupServiceAddress1, lookupServiceAddress2));
        waitUntilState(channel1, bundle3, Init);
        waitUntilState(channel2, bundle3, Init);

        validateMonitorCounters(leader,
                0,
                3,
                1,
                0,
                0,
                0,
                0);


        cleanTableViews();

        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        FieldUtils.writeDeclaredField(channel1,
                "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);

        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);
    }

    @Test(priority = 17)
    public void testOverrideInactiveBrokerStateData()
            throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {

        var leaderChannel = channel1;
        var followerChannel = channel2;
        String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
        String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
        assertEquals(leader, leader2);
        if (leader.equals(lookupServiceAddress2)) {
            leaderChannel = channel2;
            followerChannel = channel1;
        }

        String broker = lookupServiceAddress1;

        // test override states
        String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
        String splittingBundle = bundle;
        String assigningBundle = "public/assigning/0xfffffff0_0xffffffff";
        String freeBundle = "public/free/0xfffffff0_0xffffffff";
        String deletedBundle = "public/deleted/0xfffffff0_0xffffffff";
        String ownedBundle = "public/owned/0xfffffff0_0xffffffff";
        overrideTableViews(releasingBundle,
                new ServiceUnitStateData(Releasing, null, broker, 1));
        overrideTableViews(splittingBundle,
                new ServiceUnitStateData(Splitting, null, broker,
                        Map.of(childBundle1Range, Optional.empty(),
                                childBundle2Range, Optional.empty()), 1));
        overrideTableViews(assigningBundle,
                new ServiceUnitStateData(Assigning, broker, null, 1));
        overrideTableViews(freeBundle,
                new ServiceUnitStateData(Free, null, broker, 1));
        overrideTableViews(deletedBundle,
                new ServiceUnitStateData(Deleted, null, broker, 1));
        overrideTableViews(ownedBundle,
                new ServiceUnitStateData(Owned, broker, null, 1));

        // test stable metadata state
        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
                .when(loadManager).selectAsync(any(), any());
        leaderChannel.handleMetadataSessionEvent(SessionReestablished);
        followerChannel.handleMetadataSessionEvent(SessionReestablished);
        FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
                System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
        FieldUtils.writeDeclaredField(followerChannel, "lastMetadataSessionEventTimestamp",
                System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
        leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
        followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

        waitUntilNewOwner(channel2, releasingBundle, lookupServiceAddress2);
        waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress2);
        waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress2);
        waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
        waitUntilNewOwner(channel2, ownedBundle, lookupServiceAddress2);
        assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
        assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
        assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());

        // clean-up
        FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true);
        cleanTableViews();

    }

    @Test(priority = 18)
    public void testOverrideOrphanStateData()
            throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {

        var leaderChannel = channel1;
        var followerChannel = channel2;
        String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
        String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
        assertEquals(leader, leader2);
        if (leader.equals(lookupServiceAddress2)) {
            leaderChannel = channel2;
            followerChannel = channel1;
        }

        String broker = lookupServiceAddress1;

        // test override states
        String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
        String splittingBundle = bundle;
        String assigningBundle = "public/assigning/0xfffffff0_0xffffffff";
        String freeBundle = "public/free/0xfffffff0_0xffffffff";
        String deletedBundle = "public/deleted/0xfffffff0_0xffffffff";
        String ownedBundle = "public/owned/0xfffffff0_0xffffffff";
        overrideTableViews(releasingBundle,
                new ServiceUnitStateData(Releasing, null, broker, 1));
        overrideTableViews(splittingBundle,
                new ServiceUnitStateData(Splitting, null, broker,
                        Map.of(childBundle1Range, Optional.empty(),
                                childBundle2Range, Optional.empty()), 1));
        overrideTableViews(assigningBundle,
                new ServiceUnitStateData(Assigning, broker, null, 1));
        overrideTableViews(freeBundle,
                new ServiceUnitStateData(Free, null, broker, 1));
        overrideTableViews(deletedBundle,
                new ServiceUnitStateData(Deleted, null, broker, 1));
        overrideTableViews(ownedBundle,
                new ServiceUnitStateData(Owned, broker, null, 1));

        // test stable metadata state
        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
                .when(loadManager).selectAsync(any(), any());
        FieldUtils.writeDeclaredField(leaderChannel, "inFlightStateWaitingTimeInMillis",
                -1, true);
        FieldUtils.writeDeclaredField(followerChannel, "inFlightStateWaitingTimeInMillis",
                -1, true);
        ((ServiceUnitStateChannelImpl) leaderChannel)
                .monitorOwnerships(List.of(lookupServiceAddress1, lookupServiceAddress2));

        waitUntilNewOwner(channel2, releasingBundle, broker);
        waitUntilNewOwner(channel2, childBundle11, broker);
        waitUntilNewOwner(channel2, childBundle12, broker);
        waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
        waitUntilNewOwner(channel2, ownedBundle, broker);
        assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
        assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
        assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());

        // clean-up
        FieldUtils.writeDeclaredField(channel1,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        FieldUtils.writeDeclaredField(channel2,
                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
        cleanTableViews();
    }


    private static ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
            ServiceUnitStateChannel channel) throws IllegalAccessException {
        return (ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>>)
                FieldUtils.readDeclaredField(channel,
                        "getOwnerRequests", true);
    }

    private static SessionEvent getLastMetadataSessionEvent(ServiceUnitStateChannel channel)
            throws IllegalAccessException {
        return (SessionEvent)
                FieldUtils.readField(channel, "lastMetadataSessionEvent", true);
    }

    private static long getLastMetadataSessionEventTimestamp(ServiceUnitStateChannel channel)
            throws IllegalAccessException {
        return (long)
                FieldUtils.readField(channel, "lastMetadataSessionEventTimestamp", true);
    }

    private static ConcurrentOpenHashMap<String, CompletableFuture<Void>> getCleanupJobs(
            ServiceUnitStateChannel channel) throws IllegalAccessException {
        return (ConcurrentOpenHashMap<String, CompletableFuture<Void>>)
                FieldUtils.readField(channel, "cleanupJobs", true);
    }


    private static void waitUntilNewChannelOwner(ServiceUnitStateChannel channel, String oldOwner) {
        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> { // wait until true
                    CompletableFuture<Optional<String>> owner = channel.getChannelOwnerAsync();
                    if (!owner.isDone()) {
                        return false;
                    }

                    return !StringUtils.equals(oldOwner, owner.get().orElse(null));
                });
    }

    private static void waitUntilOwnerChanges(ServiceUnitStateChannel channel, String serviceUnit, String oldOwner) {
        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> { // wait until true
                    CompletableFuture<Optional<String>> owner = channel.getOwnerAsync(serviceUnit);
                    if (!owner.isDone()) {
                        return false;
                    }
                    return !StringUtils.equals(oldOwner, owner.get().orElse(null));
                });
    }

    private static void waitUntilNewOwner(ServiceUnitStateChannel channel, String serviceUnit, String newOwner) {
        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(15, TimeUnit.SECONDS)
                .until(() -> { // wait until true
                    try {
                        CompletableFuture<Optional<String>> owner = channel.getOwnerAsync(serviceUnit);
                        if (!owner.isDone()) {
                            return false;
                        }
                        return StringUtils.equals(newOwner, owner.get().orElse(null));
                    } catch (Exception e) {
                        return false;
                    }
                });
    }

    private static void waitUntilState(ServiceUnitStateChannel channel, String key)
            throws IllegalAccessException {
        TableViewImpl<ServiceUnitStateData> tv = (TableViewImpl<ServiceUnitStateData>)
                FieldUtils.readField(channel, "tableview", true);
        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> { // wait until true
                    ServiceUnitStateData actual = tv.get(key);
                    if (actual == null) {
                        return true;
                    } else {
                        return actual.state() != Owned;
                    }
                });
    }

    private static void waitUntilState(ServiceUnitStateChannel channel, String key, ServiceUnitState expected)
            throws IllegalAccessException {
        TableViewImpl<ServiceUnitStateData> tv = (TableViewImpl<ServiceUnitStateData>)
                FieldUtils.readField(channel, "tableview", true);
        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> { // wait until true
                    ServiceUnitStateData data = tv.get(key);
                    ServiceUnitState actual = state(data);
                    return actual == expected;
                });
    }

    private void waitUntilStateWithMonitor(ServiceUnitStateChannel channel, String key, ServiceUnitState expected)
            throws IllegalAccessException {
        TableViewImpl<ServiceUnitStateData> tv = (TableViewImpl<ServiceUnitStateData>)
                FieldUtils.readField(channel, "tableview", true);
        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> { // wait until true
                    ((ServiceUnitStateChannelImpl) channel)
                            .monitorOwnerships(List.of(lookupServiceAddress1, lookupServiceAddress2));
                    ServiceUnitStateData data = tv.get(key);
                    ServiceUnitState actual = state(data);
                    return actual == expected;
                });
    }

    private static void cleanTableView(ServiceUnitStateChannel channel, String serviceUnit)
            throws IllegalAccessException {
        var tv = (TableViewImpl<ServiceUnitStateData>)
                FieldUtils.readField(channel, "tableview", true);
        var cache = (ConcurrentMap<String, ServiceUnitStateData>)
                FieldUtils.readField(tv, "data", true);
        cache.remove(serviceUnit);
    }

    private void cleanTableViews()
            throws IllegalAccessException {
        var tv1 = (TableViewImpl<ServiceUnitStateData>)
                FieldUtils.readField(channel1, "tableview", true);
        var cache1 = (ConcurrentMap<String, ServiceUnitStateData>)
                FieldUtils.readField(tv1, "data", true);
        cache1.clear();

        var tv2 = (TableViewImpl<ServiceUnitStateData>)
                FieldUtils.readField(channel2, "tableview", true);
        var cache2 = (ConcurrentMap<String, ServiceUnitStateData>)
                FieldUtils.readField(tv2, "data", true);
        cache2.clear();
    }

    private void overrideTableViews(String serviceUnit, ServiceUnitStateData val) throws IllegalAccessException {
        overrideTableView(channel1, serviceUnit, val);
        overrideTableView(channel2, serviceUnit, val);
    }

    private static void overrideTableView(ServiceUnitStateChannel channel, String serviceUnit, ServiceUnitStateData val)
            throws IllegalAccessException {
        var tv = (TableViewImpl<ServiceUnitStateData>)
                FieldUtils.readField(channel, "tableview", true);
        var cache = (ConcurrentMap<String, ServiceUnitStateData>)
                FieldUtils.readField(tv, "data", true);
        if(val == null){
            cache.remove(serviceUnit);
        } else {
            cache.put(serviceUnit, val);
        }
    }

    private static void cleanOpsCounters(ServiceUnitStateChannel channel)
            throws IllegalAccessException {
        var handlerCounters =
                (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
                        FieldUtils.readDeclaredField(channel, "handlerCounters", true);

        for(var val : handlerCounters.values()){
            val.getFailure().set(0);
            val.getTotal().set(0);
        }

        var eventCounters =
                (Map<ServiceUnitStateChannelImpl.EventType, ServiceUnitStateChannelImpl.Counters>)
                        FieldUtils.readDeclaredField(channel, "eventCounters", true);

        for(var val : eventCounters.values()){
            val.getFailure().set(0);
            val.getTotal().set(0);
        }

        var ownerLookUpCounters =
                (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
                        FieldUtils.readDeclaredField(channel, "ownerLookUpCounters", true);

        for(var val : ownerLookUpCounters.values()){
            val.getFailure().set(0);
            val.getTotal().set(0);
        }
    }

    private void cleanOwnershipMonitorCounters(ServiceUnitStateChannel channel) throws IllegalAccessException {
        FieldUtils.writeDeclaredField(channel, "totalInactiveBrokerCleanupCnt", 0, true);
        FieldUtils.writeDeclaredField(channel, "totalServiceUnitTombstoneCleanupCnt", 0, true);
        FieldUtils.writeDeclaredField(channel, "totalOrphanServiceUnitCleanupCnt", 0, true);
        FieldUtils.writeDeclaredField(channel, "totalCleanupErrorCnt", new AtomicLong(0), true);
        FieldUtils.writeDeclaredField(channel, "totalInactiveBrokerCleanupScheduledCnt", 0, true);
        FieldUtils.writeDeclaredField(channel, "totalInactiveBrokerCleanupIgnoredCnt", 0, true);
        FieldUtils.writeDeclaredField(channel, "totalInactiveBrokerCleanupCancelledCnt", 0, true);
    }

    private void cleanMetadataState(ServiceUnitStateChannel channel) throws IllegalAccessException {
        channel.handleMetadataSessionEvent(SessionReestablished);
        FieldUtils.writeDeclaredField(channel, "lastMetadataSessionEventTimestamp", 0L, true);
    }

    private static long getCleanupMetric(ServiceUnitStateChannel channel, String metric)
            throws IllegalAccessException {
        Object var = FieldUtils.readDeclaredField(channel, metric, true);
        if (var instanceof AtomicLong) {
            return ((AtomicLong) var).get();
        } else {
            return (long) var;
        }
    }

    private static void validateHandlerCounters(ServiceUnitStateChannel channel,
                                                long assignedT, long assignedF,
                                                long ownedT, long ownedF,
                                                long releasedT, long releasedF,
                                                long splittingT, long splittingF,
                                                long freeT, long freeF,
                                                long initT, long initF,
                                                long deletedT, long deletedF)
            throws IllegalAccessException {
        var handlerCounters =
                (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
                        FieldUtils.readDeclaredField(channel, "handlerCounters", true);

        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(() -> { // wait until true
                    assertEquals(assignedT, handlerCounters.get(Assigning).getTotal().get());
                    assertEquals(assignedF, handlerCounters.get(Assigning).getFailure().get());
                    assertEquals(ownedT, handlerCounters.get(Owned).getTotal().get());
                    assertEquals(ownedF, handlerCounters.get(Owned).getFailure().get());
                    assertEquals(releasedT, handlerCounters.get(Releasing).getTotal().get());
                    assertEquals(releasedF, handlerCounters.get(Releasing).getFailure().get());
                    assertEquals(splittingT, handlerCounters.get(Splitting).getTotal().get());
                    assertEquals(splittingF, handlerCounters.get(Splitting).getFailure().get());
                    assertEquals(freeT, handlerCounters.get(Free).getTotal().get());
                    assertEquals(freeF, handlerCounters.get(Free).getFailure().get());
                    assertEquals(initT, handlerCounters.get(Init).getTotal().get());
                    assertEquals(initF, handlerCounters.get(Init).getFailure().get());
                    assertEquals(deletedT, handlerCounters.get(Deleted).getTotal().get());
                    assertEquals(deletedF, handlerCounters.get(Deleted).getFailure().get());
                });
    }

    private static void validateEventCounters(ServiceUnitStateChannel channel,
                                              long assignT, long assignF,
                                              long splitT, long splitF,
                                              long unloadT, long unloadF)
            throws IllegalAccessException {
        var eventCounters =
                (Map<ServiceUnitStateChannelImpl.EventType, ServiceUnitStateChannelImpl.Counters>)
                        FieldUtils.readDeclaredField(channel, "eventCounters", true);

        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(() -> { // wait until true
                    assertEquals(assignT, eventCounters.get(Assign).getTotal().get());
                    assertEquals(assignF, eventCounters.get(Assign).getFailure().get());
                    assertEquals(splitT, eventCounters.get(Split).getTotal().get());
                    assertEquals(splitF, eventCounters.get(Split).getFailure().get());
                    assertEquals(unloadT, eventCounters.get(Unload).getTotal().get());
                    assertEquals(unloadF, eventCounters.get(Unload).getFailure().get());
                });
    }

    private static void validateOwnerLookUpCounters(ServiceUnitStateChannel channel,
                                                    long assigned,
                                                    long owned,
                                                    long released,
                                                    long splitting,
                                                    long free,
                                                    long deleted,
                                                    long init
                                                    )
            throws IllegalAccessException {
        var ownerLookUpCounters =
                (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
                        FieldUtils.readDeclaredField(channel, "ownerLookUpCounters", true);

        Awaitility.await()
                .pollInterval(200, TimeUnit.MILLISECONDS)
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(() -> { // wait until true
                    assertEquals(assigned, ownerLookUpCounters.get(Assigning).getTotal().get());
                    assertEquals(owned, ownerLookUpCounters.get(Owned).getTotal().get());
                    assertEquals(released, ownerLookUpCounters.get(Releasing).getTotal().get());
                    assertEquals(splitting, ownerLookUpCounters.get(Splitting).getTotal().get());
                    assertEquals(free, ownerLookUpCounters.get(Free).getTotal().get());
                    assertEquals(deleted, ownerLookUpCounters.get(Deleted).getTotal().get());
                    assertEquals(init, ownerLookUpCounters.get(Init).getTotal().get());
                });
    }

    private static void validateMonitorCounters(ServiceUnitStateChannel channel,
                                                long totalInactiveBrokerCleanupCnt,
                                                long totalServiceUnitTombstoneCleanupCnt,
                                                long totalOrphanServiceUnitCleanupCnt,
                                                long totalCleanupErrorCnt,
                                                long totalInactiveBrokerCleanupScheduledCnt,
                                                long totalInactiveBrokerCleanupIgnoredCnt,
                                                long totalInactiveBrokerCleanupCancelledCnt)
            throws IllegalAccessException {
        assertEquals(totalInactiveBrokerCleanupCnt, getCleanupMetric(channel, "totalInactiveBrokerCleanupCnt"));
        assertEquals(totalServiceUnitTombstoneCleanupCnt,
                getCleanupMetric(channel, "totalServiceUnitTombstoneCleanupCnt"));
        assertEquals(totalOrphanServiceUnitCleanupCnt, getCleanupMetric(channel, "totalOrphanServiceUnitCleanupCnt"));
        assertEquals(totalCleanupErrorCnt, getCleanupMetric(channel, "totalCleanupErrorCnt"));
        assertEquals(totalInactiveBrokerCleanupScheduledCnt,
                getCleanupMetric(channel, "totalInactiveBrokerCleanupScheduledCnt"));
        assertEquals(totalInactiveBrokerCleanupIgnoredCnt,
                getCleanupMetric(channel, "totalInactiveBrokerCleanupIgnoredCnt"));
        assertEquals(totalInactiveBrokerCleanupCancelledCnt,
                getCleanupMetric(channel, "totalInactiveBrokerCleanupCancelledCnt"));
    }

    ServiceUnitStateChannelImpl createChannel(PulsarService pulsar)
            throws IllegalAccessException {
        var tmpChannel = new ServiceUnitStateChannelImpl(pulsar);
        FieldUtils.writeDeclaredField(tmpChannel, "ownershipMonitorDelayTimeInSecs", 5, true);
        var channel = spy(tmpChannel);

        doReturn(loadManagerContext).when(channel).getContext();
        doReturn(registry).when(channel).getBrokerRegistry();
        doReturn(loadManager).when(channel).getLoadManager();


        var leaderElectionService = new LeaderElectionService(
                pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(),
                state -> {
                    if (state == LeaderElectionState.Leading) {
                        channel.scheduleOwnershipMonitor();
                    } else {
                        channel.cancelOwnershipMonitor();
                    }
                });
        leaderElectionService.start();

        doReturn(leaderElectionService).when(channel).getLeaderElectionService();

        return channel;
    }
}
