/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.pulsar.broker.testcontext;

import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
import org.apache.zookeeper.data.ACL;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
import org.mockito.internal.util.MockUtil;

/**
 * A test context that can be used to set up a Pulsar broker and associated resources.
 *
 * There are 2 types of Pulsar unit tests that use a PulsarService:
 * <ul>
 * <li>Some Pulsar unit tests use a PulsarService that isn't started</li>
 * <li>Some Pulsar unit tests start the PulsarService and use less mocking</li>
 * </ul>
 *
 * This class can be used to set up a PulsarService that can be used in both types of tests.
 *
 * There are few motivations for PulsarTestContext:
 * <ul>
 * <li>It reduces the reliance on Mockito for hooking into the PulsarService for injecting mocks or customizing the behavior of some
 * collaborators. Mockito is not thread-safe and some mocking operations get corrupted. Some examples of the issues: https://github.com/apache/pulsar/issues/13620, https://github.com/apache/pulsar/issues/16444 and https://github.com/apache/pulsar/issues/16427.</li>
 * <li>Since the Mockito issue causes test flakiness, this change will improve reliability.</li>
 * <li>It makes it possible to use composition over inheritance in test classes. This can help reduce the dependency on
 * deep test base cases hierarchies.</li>
 * <li>It reduces code duplication across test classes.</li>
 * </ul>
 *
 * <h2>Example usage of a PulsarService that is started</h2>
 * <pre>{@code
 * PulsarTestContext testContext = PulsarTestContext.builder()
 *     .spyByDefault()
 *     .withMockZooKeeper()
 *     .build();
 * PulsarService pulsarService = testContext.getPulsarService();
 * try {
 *     // Do some testing
 * } finally {
 *     testContext.close();
 * }
 * }</pre>
 *
 * <h2>Example usage of a PulsarService that is not started at all</h2>
 * <pre>{@code
 * PulsarTestContext testContext = PulsarTestContext.builderForNonStartableContext()
 *     .spyByDefault()
 *     .build();
 * PulsarService pulsarService = testContext.getPulsarService();
 * try {
 *     // Do some testing
 * } finally {
 *     testContext.close();
 * }
 * }</pre>
 */
@Slf4j
@ToString
@Getter
@Builder(builderClassName = "Builder")
public class PulsarTestContext implements AutoCloseable {
    private final ServiceConfiguration config;
    private final MetadataStoreExtended localMetadataStore;
    private final MetadataStoreExtended configurationMetadataStore;
    private final PulsarResources pulsarResources;

    private final OrderedExecutor executor;

    private final ManagedLedgerStorage managedLedgerClientFactory;

    private final PulsarService pulsarService;

    private final Compactor compactor;

    private final CompactionServiceFactory compactionServiceFactory;

    private final BrokerService brokerService;

    @Getter(AccessLevel.NONE)
    @Singular("registerCloseable")
    private final List<AutoCloseable> closeables;

    private final BrokerInterceptor brokerInterceptor;

    private final BookKeeper bookKeeperClient;

    private final MockZooKeeper mockZooKeeper;

    private final MockZooKeeper mockZooKeeperGlobal;

    private final SpyConfig spyConfig;

    private final boolean startable;


    public ManagedLedgerFactory getManagedLedgerFactory() {
        return managedLedgerClientFactory.getManagedLedgerFactory();
    }

    public PulsarMockBookKeeper getMockBookKeeper() {
        return PulsarMockBookKeeper.class.cast(bookKeeperClient);
    }

    /**
     * Create a builder for a PulsarTestContext that creates a PulsarService that
     * is started.
     *
     * @return a builder for a PulsarTestContext
     */
    public static Builder builder() {
        return new StartableCustomBuilder();
    }

    /**
     * Create a builder for a PulsarTestContext that creates a PulsarService that
     * cannot be started. Some tests need to create a PulsarService that cannot be started.
     * This was added when this type of tests were migrated to use PulsarTestContext.
     *
     * @return a builder for a PulsarTestContext that cannot be started.
     */
    public static Builder builderForNonStartableContext() {
        return new NonStartableCustomBuilder();
    }

    /**
     * Close the PulsarTestContext and all the resources that it created.
     *
     * @throws Exception if there is an error closing the resources
     */
    public void close() throws Exception {
        for (int i = closeables.size() - 1; i >= 0; i--) {
            try {
                closeables.get(i).close();
            } catch (Exception e) {
                log.error("Failure in calling cleanup function", e);
            }
        }
    }

    /**
     * Create a ServerCnx instance that has a Mockito spy that is recording invocations.
     * This is useful for tests for ServerCnx.
     *
     * @return a ServerCnx instance
     */
    public ServerCnx createServerCnxSpy() {
        return BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
                getPulsarService());
    }

    /**
     * A builder for a PulsarTestContext.
     *
     * The builder is created with Lombok. That is the reason why the builder source code doesn't show all behaviors
     */
    public static class Builder {
        protected boolean useTestPulsarResources = false;
        protected MetadataStore pulsarResourcesMetadataStore;
        protected SpyConfig.Builder spyConfigBuilder = SpyConfig.builder(SpyConfig.SpyType.NONE);
        protected Consumer<PulsarService> pulsarServiceCustomizer;
        protected ServiceConfiguration svcConfig = initializeConfig();
        protected Consumer<ServiceConfiguration> configOverrideCustomizer = this::defaultOverrideServiceConfiguration;
        protected Function<BrokerService, BrokerService> brokerServiceCustomizer = Function.identity();

        /**
         * Initialize the ServiceConfiguration with default values.
         */
        protected ServiceConfiguration initializeConfig() {
            ServiceConfiguration svcConfig = new ServiceConfiguration();
            defaultOverrideServiceConfiguration(svcConfig);
            return svcConfig;
        }

        /**
         * Some settings like the broker shutdown timeout and thread pool sizes are
         * overridden if the provided values are the default values.
         * This is used to run tests with smaller thread pools and shorter timeouts by default.
         * You can use <pre>{@code .configCustomizer(null)}</pre> to disable this behavior
         */
        protected void defaultOverrideServiceConfiguration(ServiceConfiguration svcConfig) {
            ServiceConfiguration unconfiguredDefaults = new ServiceConfiguration();

            // adjust brokerShutdownTimeoutMs if it is the default value or if it is 0
            if (svcConfig.getBrokerShutdownTimeoutMs() == unconfiguredDefaults.getBrokerShutdownTimeoutMs()
                    || svcConfig.getBrokerShutdownTimeoutMs() == 0L) {
                svcConfig.setBrokerShutdownTimeoutMs(resolveBrokerShutdownTimeoutMs());
            }

            // adjust thread pool sizes if they are the default values

            if (svcConfig.getNumIOThreads() == unconfiguredDefaults.getNumIOThreads()) {
                svcConfig.setNumIOThreads(4);
            }
            if (svcConfig.getNumOrderedExecutorThreads() == unconfiguredDefaults.getNumOrderedExecutorThreads()) {
                // use a single threaded ordered executor by default
                svcConfig.setNumOrderedExecutorThreads(1);
            }
            if (svcConfig.getNumExecutorThreadPoolSize() == unconfiguredDefaults.getNumExecutorThreadPoolSize()) {
                svcConfig.setNumExecutorThreadPoolSize(5);
            }
            if (svcConfig.getNumCacheExecutorThreadPoolSize()
                    == unconfiguredDefaults.getNumCacheExecutorThreadPoolSize()) {
                svcConfig.setNumCacheExecutorThreadPoolSize(2);
            }
            if (svcConfig.getNumHttpServerThreads() == unconfiguredDefaults.getNumHttpServerThreads()) {
                svcConfig.setNumHttpServerThreads(8);
            }

            // change the default value for ports so that a random port is used
            if (unconfiguredDefaults.getBrokerServicePort().equals(svcConfig.getBrokerServicePort())) {
                svcConfig.setBrokerServicePort(Optional.of(0));
            }
            if (unconfiguredDefaults.getWebServicePort().equals(svcConfig.getWebServicePort())) {
                svcConfig.setWebServicePort(Optional.of(0));
            }

            // change the default value for nic speed
            if (unconfiguredDefaults.getLoadBalancerOverrideBrokerNicSpeedGbps()
                    .equals(svcConfig.getLoadBalancerOverrideBrokerNicSpeedGbps())) {
                svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
            }

            // set the cluster name if it's unset
            if (svcConfig.getClusterName() == null) {
                svcConfig.setClusterName("test");
            }

            // adjust managed ledger cache size
            if (svcConfig.getManagedLedgerCacheSizeMB() == unconfiguredDefaults.getManagedLedgerCacheSizeMB()) {
                svcConfig.setManagedLedgerCacheSizeMB(8);
            }

            if (svcConfig.getTopicLoadTimeoutSeconds() == unconfiguredDefaults.getTopicLoadTimeoutSeconds()) {
                svcConfig.setTopicLoadTimeoutSeconds(10);
            }
        }

        /**
         * Internal method used in the {@link StartableCustomBuilder} to override the default value for the broker
         * shutdown timeout.
         * @return the broker shutdown timeout in milliseconds
         */
        protected long resolveBrokerShutdownTimeoutMs() {
            return 0L;
        }

        /**
         * Configure the PulsarService instance and the PulsarService collaborator objects to use Mockito spies by default.
         * @see SpyConfig
         * @return the builder
         */
        public Builder spyByDefault() {
            spyConfigBuilder = SpyConfig.builder(SpyConfig.SpyType.SPY);
            return this;
        }

        public Builder spyConfigCustomizer(Consumer<SpyConfig.Builder> spyConfigCustomizer) {
            spyConfigCustomizer.accept(spyConfigBuilder);
            return this;
        }

        /**
         * Customize the ServiceConfiguration object that is used to configure the PulsarService instance.
         * @param configCustomerizer the function to customize the ServiceConfiguration instance
         * @return the builder
         */
        public Builder configCustomizer(Consumer<ServiceConfiguration> configCustomerizer) {
            configCustomerizer.accept(svcConfig);
            if (config != null) {
                configCustomerizer.accept(config);
            }
            return this;
        }

        /**
         * Override configuration values in the ServiceConfiguration instance as a last step.
         * There are default overrides provided by
         * {@link PulsarTestContext.Builder#defaultOverrideServiceConfiguration(ServiceConfiguration)}
         * that can be disabled by using <pre>{@code .configOverride(null)}</pre>
         *
         * @param configOverrideCustomizer the function that contains overrides to ServiceConfiguration,
         *                                 set to null to disable default overrides
         * @return the builder
         */
        public Builder configOverride(Consumer<ServiceConfiguration> configOverrideCustomizer) {
            this.configOverrideCustomizer = configOverrideCustomizer;
            return this;
        }

        /**
         * Customize the PulsarService instance.
         * @param pulsarServiceCustomizer the function to customize the PulsarService instance
         * @return the builder
         */
        public Builder pulsarServiceCustomizer(
                Consumer<PulsarService> pulsarServiceCustomizer) {
            this.pulsarServiceCustomizer = pulsarServiceCustomizer;
            return this;
        }

        /**
         * Reuses the BookKeeper client and metadata stores from another PulsarTestContext.
         * @param otherContext the other PulsarTestContext
         * @return the builder
         */
        public Builder reuseMockBookkeeperAndMetadataStores(PulsarTestContext otherContext) {
            bookKeeperClient(otherContext.getBookKeeperClient());
            if (otherContext.getMockZooKeeper() != null) {
                mockZooKeeper(otherContext.getMockZooKeeper());
                if (otherContext.getMockZooKeeperGlobal() != null) {
                    mockZooKeeperGlobal(otherContext.getMockZooKeeperGlobal());
                }
            } else {
                localMetadataStore(NonClosingProxyHandler.createNonClosingProxy(otherContext.getLocalMetadataStore(),
                        MetadataStoreExtended.class
                ));
                configurationMetadataStore(NonClosingProxyHandler.createNonClosingProxy(
                        otherContext.getConfigurationMetadataStore(), MetadataStoreExtended.class
                ));
            }
            return this;
        }

        /**
         * Reuses the {@link SpyConfig} from another PulsarTestContext.
         * @param otherContext the other PulsarTestContext
         * @return the builder
         */
        public Builder reuseSpyConfig(PulsarTestContext otherContext) {
            spyConfigBuilder = otherContext.getSpyConfig().toBuilder();
            return this;
        }

        /**
         * Chains closing of the other PulsarTestContext to this one.
         * The other PulsarTestContext will be closed when this one is closed.
         */
        public Builder chainClosing(PulsarTestContext otherContext) {
            registerCloseable(otherContext);
            return this;
        }

        /**
         * Configure this PulsarTestContext to use a mock ZooKeeper instance which is
         * shared for both the local and configuration metadata stores.
         *
         * @return the builder
         */
        public Builder withMockZookeeper() {
            return withMockZookeeper(false);
        }

        /**
         * Configure this PulsarTestContext to use a mock ZooKeeper instance.
         *
         * @param useSeparateGlobalZk if true, the global (configuration) zookeeper will be a separate instance
         * @return the builder
         */
        public Builder withMockZookeeper(boolean useSeparateGlobalZk) {
            try {
                mockZooKeeper(createMockZooKeeper());
                if (useSeparateGlobalZk) {
                    mockZooKeeperGlobal(createMockZooKeeper());
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            return this;
        }

        private MockZooKeeper createMockZooKeeper() throws Exception {
            MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
            List<ACL> dummyAclList = new ArrayList<>(0);

            ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
                    "".getBytes(StandardCharsets.UTF_8), dummyAclList, CreateMode.PERSISTENT);

            zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
                    CreateMode.PERSISTENT);

            registerCloseable(zk::shutdown);
            return zk;
        }

        /**
         * Applicable only when PulsarTestContext is not startable. This will configure mocks
         * for PulsarTestResources and related classes.
         *
         * @return the builder
         */
        public Builder useTestPulsarResources() {
            if (startable) {
                throw new IllegalStateException("Cannot useTestPulsarResources when startable.");
            }
            useTestPulsarResources = true;
            return this;
        }

        /**
         * Applicable only when PulsarTestContext is not startable. This will configure mocks
         * for PulsarTestResources and related collaborator instances.
         * The {@link NamespaceResources} and {@link TopicResources} instances will use the provided
         * {@link MetadataStore} instance.
         * @param metadataStore the metadata store to use
         * @return the builder
         */
        public Builder useTestPulsarResources(MetadataStore metadataStore) {
            if (startable) {
                throw new IllegalStateException("Cannot useTestPulsarResources when startable.");
            }
            useTestPulsarResources = true;
            pulsarResourcesMetadataStore = metadataStore;
            return this;
        }

        /**
         * Applicable only when PulsarTestContext is not startable. This will configure the {@link BookKeeper}
         * and {@link ManagedLedgerFactory} instances to use for creating a {@link ManagedLedgerStorage} instance
         * for PulsarService.
         *
         * @param bookKeeperClient the bookkeeper client to use (mock bookkeeper)
         * @param managedLedgerFactory the managed ledger factory to use (could be a mock)
         * @return the builder
         */
        public Builder managedLedgerClients(BookKeeper bookKeeperClient,
                                            ManagedLedgerFactory managedLedgerFactory) {
            return managedLedgerClientFactory(
                    PulsarTestContext.createManagedLedgerClientFactory(bookKeeperClient, managedLedgerFactory));
        }

        /**
         * Configures a function to use for customizing the {@link BrokerService} instance when it gets created.
         * @return the builder
         */
        public Builder brokerServiceCustomizer(Function<BrokerService, BrokerService> brokerServiceCustomizer) {
            this.brokerServiceCustomizer = brokerServiceCustomizer;
            return this;
        }
    }

    /**
     * Internal class that contains customizations for the Lombok generated Builder.
     *
     * With Lombok, it is necessary to extend the generated Builder class for adding customizations related to
     * instantiation and completing the builder.
     */
    static abstract class AbstractCustomBuilder extends Builder {
        AbstractCustomBuilder(boolean startable) {
            super.startable = startable;
        }

        public Builder startable(boolean startable) {
            throw new UnsupportedOperationException("Cannot change startability after builder creation.");
        }

        @Override
        public final PulsarTestContext build() {
            SpyConfig spyConfig = spyConfigBuilder.build();
            spyConfig(spyConfig);
            if (super.config == null) {
                config(svcConfig);
            }
            if (configOverrideCustomizer != null) {
                configOverrideCustomizer.accept(super.config);
            }
            if (super.brokerInterceptor != null) {
                super.config.setDisableBrokerInterceptors(false);
            }
            initializeCommonPulsarServices(spyConfig);
            initializePulsarServices(spyConfig, this);
            if (pulsarServiceCustomizer != null) {
                pulsarServiceCustomizer.accept(super.pulsarService);
            }
            if (super.startable) {
                try {
                    super.pulsarService.start();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            brokerService(super.pulsarService.getBrokerService());
            return super.build();
        }

        private void initializeCommonPulsarServices(SpyConfig spyConfig) {
            if (super.bookKeeperClient == null && super.managedLedgerClientFactory == null) {
                if (super.executor == null) {
                    OrderedExecutor createdExecutor = OrderedExecutor.newBuilder().numThreads(1)
                            .name(PulsarTestContext.class.getSimpleName() + "-executor").build();
                    registerCloseable(() -> GracefulExecutorServicesShutdown.initiate()
                            .timeout(Duration.ZERO)
                            .shutdown(createdExecutor)
                            .handle().get());
                    super.executor = createdExecutor;
                }
                NonClosableMockBookKeeper mockBookKeeper;
                try {
                    mockBookKeeper =
                            spyConfig.getBookKeeperClient().spy(NonClosableMockBookKeeper.class, super.executor);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                registerCloseable(() -> {
                    mockBookKeeper.reallyShutdown();
                    resetSpyOrMock(mockBookKeeper);
                });
                bookKeeperClient(mockBookKeeper);
            }
            if (super.bookKeeperClient == null && super.managedLedgerClientFactory != null) {
                bookKeeperClient(super.managedLedgerClientFactory.getBookKeeperClient());
            }
            if (super.localMetadataStore == null || super.configurationMetadataStore == null) {
                if (super.mockZooKeeper != null) {
                    MetadataStoreExtended mockZookeeperMetadataStore =
                            createMockZookeeperMetadataStore(super.mockZooKeeper, MetadataStoreConfig.METADATA_STORE);
                    if (super.localMetadataStore == null) {
                        localMetadataStore(mockZookeeperMetadataStore);
                    }
                    if (super.configurationMetadataStore == null) {
                        if (super.mockZooKeeperGlobal != null) {
                            configurationMetadataStore(createMockZookeeperMetadataStore(super.mockZooKeeperGlobal,
                                    MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
                        } else {
                            configurationMetadataStore(mockZookeeperMetadataStore);
                        }
                    }
                } else {
                    try {
                        MetadataStoreExtended store = MetadataStoreFactoryImpl.createExtended("memory:local",
                                MetadataStoreConfig.builder().build());
                        registerCloseable(() -> {
                            store.close();
                            resetSpyOrMock(store);
                        });
                        MetadataStoreExtended nonClosingProxy =
                                NonClosingProxyHandler.createNonClosingProxy(store, MetadataStoreExtended.class
                                );
                        if (super.localMetadataStore == null) {
                            localMetadataStore(nonClosingProxy);
                        }
                        if (super.configurationMetadataStore == null) {
                            configurationMetadataStore(nonClosingProxy);
                        }
                    } catch (MetadataStoreException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }

        private MetadataStoreExtended createMockZookeeperMetadataStore(MockZooKeeper mockZooKeeper,
                                                                       String metadataStoreName) {
            // provide a unique session id for each instance
            MockZooKeeperSession mockZooKeeperSession = MockZooKeeperSession.newInstance(mockZooKeeper, false);
            registerCloseable(() -> {
                mockZooKeeperSession.close();
                resetSpyOrMock(mockZooKeeperSession);
            });
            ZKMetadataStore zkMetadataStore = new ZKMetadataStore(mockZooKeeperSession,
                    MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
            registerCloseable(() -> {
                zkMetadataStore.close();
                resetSpyOrMock(zkMetadataStore);
            });
            MetadataStoreExtended nonClosingProxy =
                    NonClosingProxyHandler.createNonClosingProxy(zkMetadataStore, MetadataStoreExtended.class);
            return nonClosingProxy;
        }

        protected abstract void initializePulsarServices(SpyConfig spyConfig, Builder builder);
    }

    static void resetSpyOrMock(Object object) {
        if (MockUtil.isMock(object)) {
            Mockito.reset(object);
        }
    }

    /**
     * Customizations for a builder for creating a PulsarTestContext that is "startable".
     */
    static class StartableCustomBuilder extends AbstractCustomBuilder {
        StartableCustomBuilder() {
            super(true);
        }

        @Override
        public Builder managedLedgerClientFactory(ManagedLedgerStorage managedLedgerClientFactory) {
            throw new IllegalStateException("Cannot set managedLedgerClientFactory when startable.");
        }

        @Override
        public Builder pulsarResources(PulsarResources pulsarResources) {
            throw new IllegalStateException("Cannot set pulsarResources when startable.");
        }

        @Override
        protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
            BookKeeperClientFactory bookKeeperClientFactory =
                    new MockBookKeeperClientFactory(builder.bookKeeperClient);
            CompactionServiceFactory compactionServiceFactory = builder.compactionServiceFactory;
            if (builder.compactionServiceFactory == null && builder.config.getCompactionServiceFactoryClassName()
                    .equals(PulsarCompactionServiceFactory.class.getName())) {
                compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
            }
            PulsarService pulsarService = spyConfig.getPulsarService()
                    .spy(StartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
                            builder.configurationMetadataStore, compactionServiceFactory,
                            builder.brokerInterceptor,
                            bookKeeperClientFactory, builder.brokerServiceCustomizer);
            if (compactionServiceFactory != null) {
                compactionServiceFactory.initialize(pulsarService);
            }
            registerCloseable(() -> {
                pulsarService.close();
                resetSpyOrMock(pulsarService);
            });
            pulsarService(pulsarService);
        }

        @Override
        protected long resolveBrokerShutdownTimeoutMs() {
            // wait 5 seconds for the startable pulsar service to shutdown gracefully
            // this reduces the chances that some threads of the PulsarTestsContexts of subsequent test executions
            // are running in parallel. It doesn't prevent it completely.
            return 5000L;
        }
    }

    /**
     * Customizations for a builder for creating a PulsarTestContext that is "non-startable".
     */
    static class NonStartableCustomBuilder extends AbstractCustomBuilder {

        NonStartableCustomBuilder() {
            super(false);
        }

        @Override
        protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
            if (builder.managedLedgerClientFactory == null) {
                ManagedLedgerFactory mlFactoryMock = Mockito.mock(ManagedLedgerFactory.class);
                managedLedgerClientFactory(
                        PulsarTestContext.createManagedLedgerClientFactory(builder.bookKeeperClient, mlFactoryMock));
            }
            if (builder.pulsarResources == null) {
                SpyConfig.SpyType spyConfigPulsarResources = spyConfig.getPulsarResources();
                if (useTestPulsarResources) {
                    MetadataStore metadataStore = pulsarResourcesMetadataStore;
                    if (metadataStore == null) {
                        metadataStore = builder.configurationMetadataStore;
                    }
                    NamespaceResources nsr = spyConfigPulsarResources.spy(NamespaceResources.class, metadataStore, 30);
                    TopicResources tsr = spyConfigPulsarResources.spy(TopicResources.class, metadataStore);
                    pulsarResources(
                            spyConfigPulsarResources.spy(
                                    NonStartableTestPulsarService.TestPulsarResources.class, builder.localMetadataStore,
                                    builder.configurationMetadataStore,
                                    tsr, nsr));
                } else {
                    pulsarResources(
                            spyConfigPulsarResources.spy(PulsarResources.class, builder.localMetadataStore,
                                    builder.configurationMetadataStore));
                }
            }
            BookKeeperClientFactory bookKeeperClientFactory =
                    new MockBookKeeperClientFactory(builder.bookKeeperClient);
            CompactionServiceFactory compactionServiceFactory = builder.compactionServiceFactory;
            if (builder.compactionServiceFactory == null && builder.config.getCompactionServiceFactoryClassName()
                    .equals(PulsarCompactionServiceFactory.class.getName())) {
                compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
            }
            PulsarService pulsarService = spyConfig.getPulsarService()
                    .spy(NonStartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
                            builder.configurationMetadataStore, compactionServiceFactory,
                            builder.brokerInterceptor,
                            bookKeeperClientFactory, builder.pulsarResources,
                            builder.managedLedgerClientFactory, builder.brokerServiceCustomizer);
            if (compactionServiceFactory != null) {
                compactionServiceFactory.initialize(pulsarService);
            }
            registerCloseable(() -> {
                pulsarService.close();
                resetSpyOrMock(pulsarService);
            });
            pulsarService(pulsarService);
        }
    }

    @NotNull
    private static ManagedLedgerStorage createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
                                                                         ManagedLedgerFactory managedLedgerFactory) {
        return new ManagedLedgerStorage() {

            @Override
            public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
                                   BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup)
                    throws Exception {

            }

            @Override
            public ManagedLedgerFactory getManagedLedgerFactory() {
                return managedLedgerFactory;
            }

            @Override
            public StatsProvider getStatsProvider() {
                return new NullStatsProvider();
            }

            @Override
            public BookKeeper getBookKeeperClient() {
                return bookKeeperClient;
            }

            @Override
            public void close() throws IOException {

            }
        };
    }
}
