blob: 40a42286fda7450b9218019e7646d9340c7e02dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.testcontext;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
import org.apache.zookeeper.data.ACL;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
import org.mockito.internal.util.MockUtil;
/**
* A test context that can be used to set up a Pulsar broker and associated resources.
*
* There are 2 types of Pulsar unit tests that use a PulsarService:
* <ul>
* <li>Some Pulsar unit tests use a PulsarService that isn't started</li>
* <li>Some Pulsar unit tests start the PulsarService and use less mocking</li>
* </ul>
*
* This class can be used to set up a PulsarService that can be used in both types of tests.
*
* There are few motivations for PulsarTestContext:
* <ul>
* <li>It reduces the reliance on Mockito for hooking into the PulsarService for injecting mocks or customizing the behavior of some
* collaborators. Mockito is not thread-safe and some mocking operations get corrupted. Some examples of the issues: https://github.com/apache/pulsar/issues/13620, https://github.com/apache/pulsar/issues/16444 and https://github.com/apache/pulsar/issues/16427.</li>
* <li>Since the Mockito issue causes test flakiness, this change will improve reliability.</li>
* <li>It makes it possible to use composition over inheritance in test classes. This can help reduce the dependency on
* deep test base cases hierarchies.</li>
* <li>It reduces code duplication across test classes.</li>
* </ul>
*
* <h2>Example usage of a PulsarService that is started</h2>
* <pre>{@code
* PulsarTestContext testContext = PulsarTestContext.builder()
* .spyByDefault()
* .withMockZooKeeper()
* .build();
* PulsarService pulsarService = testContext.getPulsarService();
* try {
* // Do some testing
* } finally {
* testContext.close();
* }
* }</pre>
*
* <h2>Example usage of a PulsarService that is not started at all</h2>
* <pre>{@code
* PulsarTestContext testContext = PulsarTestContext.builderForNonStartableContext()
* .spyByDefault()
* .build();
* PulsarService pulsarService = testContext.getPulsarService();
* try {
* // Do some testing
* } finally {
* testContext.close();
* }
* }</pre>
*/
@Slf4j
@ToString
@Getter
@Builder(builderClassName = "Builder")
public class PulsarTestContext implements AutoCloseable {
private final ServiceConfiguration config;
private final MetadataStoreExtended localMetadataStore;
private final MetadataStoreExtended configurationMetadataStore;
private final PulsarResources pulsarResources;
private final OrderedExecutor executor;
private final ManagedLedgerStorage managedLedgerClientFactory;
private final PulsarService pulsarService;
private final Compactor compactor;
private final CompactionServiceFactory compactionServiceFactory;
private final BrokerService brokerService;
@Getter(AccessLevel.NONE)
@Singular("registerCloseable")
private final List<AutoCloseable> closeables;
private final BrokerInterceptor brokerInterceptor;
private final BookKeeper bookKeeperClient;
private final MockZooKeeper mockZooKeeper;
private final MockZooKeeper mockZooKeeperGlobal;
private final SpyConfig spyConfig;
private final boolean startable;
public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerClientFactory.getManagedLedgerFactory();
}
public PulsarMockBookKeeper getMockBookKeeper() {
return PulsarMockBookKeeper.class.cast(bookKeeperClient);
}
/**
* Create a builder for a PulsarTestContext that creates a PulsarService that
* is started.
*
* @return a builder for a PulsarTestContext
*/
public static Builder builder() {
return new StartableCustomBuilder();
}
/**
* Create a builder for a PulsarTestContext that creates a PulsarService that
* cannot be started. Some tests need to create a PulsarService that cannot be started.
* This was added when this type of tests were migrated to use PulsarTestContext.
*
* @return a builder for a PulsarTestContext that cannot be started.
*/
public static Builder builderForNonStartableContext() {
return new NonStartableCustomBuilder();
}
/**
* Close the PulsarTestContext and all the resources that it created.
*
* @throws Exception if there is an error closing the resources
*/
public void close() throws Exception {
for (int i = closeables.size() - 1; i >= 0; i--) {
try {
closeables.get(i).close();
} catch (Exception e) {
log.error("Failure in calling cleanup function", e);
}
}
}
/**
* Create a ServerCnx instance that has a Mockito spy that is recording invocations.
* This is useful for tests for ServerCnx.
*
* @return a ServerCnx instance
*/
public ServerCnx createServerCnxSpy() {
return BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
getPulsarService());
}
/**
* A builder for a PulsarTestContext.
*
* The builder is created with Lombok. That is the reason why the builder source code doesn't show all behaviors
*/
public static class Builder {
protected boolean useTestPulsarResources = false;
protected MetadataStore pulsarResourcesMetadataStore;
protected SpyConfig.Builder spyConfigBuilder = SpyConfig.builder(SpyConfig.SpyType.NONE);
protected Consumer<PulsarService> pulsarServiceCustomizer;
protected ServiceConfiguration svcConfig = initializeConfig();
protected Consumer<ServiceConfiguration> configOverrideCustomizer = this::defaultOverrideServiceConfiguration;
protected Function<BrokerService, BrokerService> brokerServiceCustomizer = Function.identity();
/**
* Initialize the ServiceConfiguration with default values.
*/
protected ServiceConfiguration initializeConfig() {
ServiceConfiguration svcConfig = new ServiceConfiguration();
defaultOverrideServiceConfiguration(svcConfig);
return svcConfig;
}
/**
* Some settings like the broker shutdown timeout and thread pool sizes are
* overridden if the provided values are the default values.
* This is used to run tests with smaller thread pools and shorter timeouts by default.
* You can use <pre>{@code .configCustomizer(null)}</pre> to disable this behavior
*/
protected void defaultOverrideServiceConfiguration(ServiceConfiguration svcConfig) {
ServiceConfiguration unconfiguredDefaults = new ServiceConfiguration();
// adjust brokerShutdownTimeoutMs if it is the default value or if it is 0
if (svcConfig.getBrokerShutdownTimeoutMs() == unconfiguredDefaults.getBrokerShutdownTimeoutMs()
|| svcConfig.getBrokerShutdownTimeoutMs() == 0L) {
svcConfig.setBrokerShutdownTimeoutMs(resolveBrokerShutdownTimeoutMs());
}
// adjust thread pool sizes if they are the default values
if (svcConfig.getNumIOThreads() == unconfiguredDefaults.getNumIOThreads()) {
svcConfig.setNumIOThreads(4);
}
if (svcConfig.getNumOrderedExecutorThreads() == unconfiguredDefaults.getNumOrderedExecutorThreads()) {
// use a single threaded ordered executor by default
svcConfig.setNumOrderedExecutorThreads(1);
}
if (svcConfig.getNumExecutorThreadPoolSize() == unconfiguredDefaults.getNumExecutorThreadPoolSize()) {
svcConfig.setNumExecutorThreadPoolSize(5);
}
if (svcConfig.getNumCacheExecutorThreadPoolSize()
== unconfiguredDefaults.getNumCacheExecutorThreadPoolSize()) {
svcConfig.setNumCacheExecutorThreadPoolSize(2);
}
if (svcConfig.getNumHttpServerThreads() == unconfiguredDefaults.getNumHttpServerThreads()) {
svcConfig.setNumHttpServerThreads(8);
}
// change the default value for ports so that a random port is used
if (unconfiguredDefaults.getBrokerServicePort().equals(svcConfig.getBrokerServicePort())) {
svcConfig.setBrokerServicePort(Optional.of(0));
}
if (unconfiguredDefaults.getWebServicePort().equals(svcConfig.getWebServicePort())) {
svcConfig.setWebServicePort(Optional.of(0));
}
// change the default value for nic speed
if (unconfiguredDefaults.getLoadBalancerOverrideBrokerNicSpeedGbps()
.equals(svcConfig.getLoadBalancerOverrideBrokerNicSpeedGbps())) {
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
}
// set the cluster name if it's unset
if (svcConfig.getClusterName() == null) {
svcConfig.setClusterName("test");
}
// adjust managed ledger cache size
if (svcConfig.getManagedLedgerCacheSizeMB() == unconfiguredDefaults.getManagedLedgerCacheSizeMB()) {
svcConfig.setManagedLedgerCacheSizeMB(8);
}
if (svcConfig.getTopicLoadTimeoutSeconds() == unconfiguredDefaults.getTopicLoadTimeoutSeconds()) {
svcConfig.setTopicLoadTimeoutSeconds(10);
}
}
/**
* Internal method used in the {@link StartableCustomBuilder} to override the default value for the broker
* shutdown timeout.
* @return the broker shutdown timeout in milliseconds
*/
protected long resolveBrokerShutdownTimeoutMs() {
return 0L;
}
/**
* Configure the PulsarService instance and the PulsarService collaborator objects to use Mockito spies by default.
* @see SpyConfig
* @return the builder
*/
public Builder spyByDefault() {
spyConfigBuilder = SpyConfig.builder(SpyConfig.SpyType.SPY);
return this;
}
public Builder spyConfigCustomizer(Consumer<SpyConfig.Builder> spyConfigCustomizer) {
spyConfigCustomizer.accept(spyConfigBuilder);
return this;
}
/**
* Customize the ServiceConfiguration object that is used to configure the PulsarService instance.
* @param configCustomerizer the function to customize the ServiceConfiguration instance
* @return the builder
*/
public Builder configCustomizer(Consumer<ServiceConfiguration> configCustomerizer) {
configCustomerizer.accept(svcConfig);
if (config != null) {
configCustomerizer.accept(config);
}
return this;
}
/**
* Override configuration values in the ServiceConfiguration instance as a last step.
* There are default overrides provided by
* {@link PulsarTestContext.Builder#defaultOverrideServiceConfiguration(ServiceConfiguration)}
* that can be disabled by using <pre>{@code .configOverride(null)}</pre>
*
* @param configOverrideCustomizer the function that contains overrides to ServiceConfiguration,
* set to null to disable default overrides
* @return the builder
*/
public Builder configOverride(Consumer<ServiceConfiguration> configOverrideCustomizer) {
this.configOverrideCustomizer = configOverrideCustomizer;
return this;
}
/**
* Customize the PulsarService instance.
* @param pulsarServiceCustomizer the function to customize the PulsarService instance
* @return the builder
*/
public Builder pulsarServiceCustomizer(
Consumer<PulsarService> pulsarServiceCustomizer) {
this.pulsarServiceCustomizer = pulsarServiceCustomizer;
return this;
}
/**
* Reuses the BookKeeper client and metadata stores from another PulsarTestContext.
* @param otherContext the other PulsarTestContext
* @return the builder
*/
public Builder reuseMockBookkeeperAndMetadataStores(PulsarTestContext otherContext) {
bookKeeperClient(otherContext.getBookKeeperClient());
if (otherContext.getMockZooKeeper() != null) {
mockZooKeeper(otherContext.getMockZooKeeper());
if (otherContext.getMockZooKeeperGlobal() != null) {
mockZooKeeperGlobal(otherContext.getMockZooKeeperGlobal());
}
} else {
localMetadataStore(NonClosingProxyHandler.createNonClosingProxy(otherContext.getLocalMetadataStore(),
MetadataStoreExtended.class
));
configurationMetadataStore(NonClosingProxyHandler.createNonClosingProxy(
otherContext.getConfigurationMetadataStore(), MetadataStoreExtended.class
));
}
return this;
}
/**
* Reuses the {@link SpyConfig} from another PulsarTestContext.
* @param otherContext the other PulsarTestContext
* @return the builder
*/
public Builder reuseSpyConfig(PulsarTestContext otherContext) {
spyConfigBuilder = otherContext.getSpyConfig().toBuilder();
return this;
}
/**
* Chains closing of the other PulsarTestContext to this one.
* The other PulsarTestContext will be closed when this one is closed.
*/
public Builder chainClosing(PulsarTestContext otherContext) {
registerCloseable(otherContext);
return this;
}
/**
* Configure this PulsarTestContext to use a mock ZooKeeper instance which is
* shared for both the local and configuration metadata stores.
*
* @return the builder
*/
public Builder withMockZookeeper() {
return withMockZookeeper(false);
}
/**
* Configure this PulsarTestContext to use a mock ZooKeeper instance.
*
* @param useSeparateGlobalZk if true, the global (configuration) zookeeper will be a separate instance
* @return the builder
*/
public Builder withMockZookeeper(boolean useSeparateGlobalZk) {
try {
mockZooKeeper(createMockZooKeeper());
if (useSeparateGlobalZk) {
mockZooKeeperGlobal(createMockZooKeeper());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return this;
}
private MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
List<ACL> dummyAclList = new ArrayList<>(0);
ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
"".getBytes(StandardCharsets.UTF_8), dummyAclList, CreateMode.PERSISTENT);
zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
CreateMode.PERSISTENT);
registerCloseable(zk::shutdown);
return zk;
}
/**
* Applicable only when PulsarTestContext is not startable. This will configure mocks
* for PulsarTestResources and related classes.
*
* @return the builder
*/
public Builder useTestPulsarResources() {
if (startable) {
throw new IllegalStateException("Cannot useTestPulsarResources when startable.");
}
useTestPulsarResources = true;
return this;
}
/**
* Applicable only when PulsarTestContext is not startable. This will configure mocks
* for PulsarTestResources and related collaborator instances.
* The {@link NamespaceResources} and {@link TopicResources} instances will use the provided
* {@link MetadataStore} instance.
* @param metadataStore the metadata store to use
* @return the builder
*/
public Builder useTestPulsarResources(MetadataStore metadataStore) {
if (startable) {
throw new IllegalStateException("Cannot useTestPulsarResources when startable.");
}
useTestPulsarResources = true;
pulsarResourcesMetadataStore = metadataStore;
return this;
}
/**
* Applicable only when PulsarTestContext is not startable. This will configure the {@link BookKeeper}
* and {@link ManagedLedgerFactory} instances to use for creating a {@link ManagedLedgerStorage} instance
* for PulsarService.
*
* @param bookKeeperClient the bookkeeper client to use (mock bookkeeper)
* @param managedLedgerFactory the managed ledger factory to use (could be a mock)
* @return the builder
*/
public Builder managedLedgerClients(BookKeeper bookKeeperClient,
ManagedLedgerFactory managedLedgerFactory) {
return managedLedgerClientFactory(
PulsarTestContext.createManagedLedgerClientFactory(bookKeeperClient, managedLedgerFactory));
}
/**
* Configures a function to use for customizing the {@link BrokerService} instance when it gets created.
* @return the builder
*/
public Builder brokerServiceCustomizer(Function<BrokerService, BrokerService> brokerServiceCustomizer) {
this.brokerServiceCustomizer = brokerServiceCustomizer;
return this;
}
}
/**
* Internal class that contains customizations for the Lombok generated Builder.
*
* With Lombok, it is necessary to extend the generated Builder class for adding customizations related to
* instantiation and completing the builder.
*/
static abstract class AbstractCustomBuilder extends Builder {
AbstractCustomBuilder(boolean startable) {
super.startable = startable;
}
public Builder startable(boolean startable) {
throw new UnsupportedOperationException("Cannot change startability after builder creation.");
}
@Override
public final PulsarTestContext build() {
SpyConfig spyConfig = spyConfigBuilder.build();
spyConfig(spyConfig);
if (super.config == null) {
config(svcConfig);
}
if (configOverrideCustomizer != null) {
configOverrideCustomizer.accept(super.config);
}
if (super.brokerInterceptor != null) {
super.config.setDisableBrokerInterceptors(false);
}
initializeCommonPulsarServices(spyConfig);
initializePulsarServices(spyConfig, this);
if (pulsarServiceCustomizer != null) {
pulsarServiceCustomizer.accept(super.pulsarService);
}
if (super.startable) {
try {
super.pulsarService.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
brokerService(super.pulsarService.getBrokerService());
return super.build();
}
private void initializeCommonPulsarServices(SpyConfig spyConfig) {
if (super.bookKeeperClient == null && super.managedLedgerClientFactory == null) {
if (super.executor == null) {
OrderedExecutor createdExecutor = OrderedExecutor.newBuilder().numThreads(1)
.name(PulsarTestContext.class.getSimpleName() + "-executor").build();
registerCloseable(() -> GracefulExecutorServicesShutdown.initiate()
.timeout(Duration.ZERO)
.shutdown(createdExecutor)
.handle().get());
super.executor = createdExecutor;
}
NonClosableMockBookKeeper mockBookKeeper;
try {
mockBookKeeper =
spyConfig.getBookKeeperClient().spy(NonClosableMockBookKeeper.class, super.executor);
} catch (Exception e) {
throw new RuntimeException(e);
}
registerCloseable(() -> {
mockBookKeeper.reallyShutdown();
resetSpyOrMock(mockBookKeeper);
});
bookKeeperClient(mockBookKeeper);
}
if (super.bookKeeperClient == null && super.managedLedgerClientFactory != null) {
bookKeeperClient(super.managedLedgerClientFactory.getBookKeeperClient());
}
if (super.localMetadataStore == null || super.configurationMetadataStore == null) {
if (super.mockZooKeeper != null) {
MetadataStoreExtended mockZookeeperMetadataStore =
createMockZookeeperMetadataStore(super.mockZooKeeper, MetadataStoreConfig.METADATA_STORE);
if (super.localMetadataStore == null) {
localMetadataStore(mockZookeeperMetadataStore);
}
if (super.configurationMetadataStore == null) {
if (super.mockZooKeeperGlobal != null) {
configurationMetadataStore(createMockZookeeperMetadataStore(super.mockZooKeeperGlobal,
MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
} else {
configurationMetadataStore(mockZookeeperMetadataStore);
}
}
} else {
try {
MetadataStoreExtended store = MetadataStoreFactoryImpl.createExtended("memory:local",
MetadataStoreConfig.builder().build());
registerCloseable(() -> {
store.close();
resetSpyOrMock(store);
});
MetadataStoreExtended nonClosingProxy =
NonClosingProxyHandler.createNonClosingProxy(store, MetadataStoreExtended.class
);
if (super.localMetadataStore == null) {
localMetadataStore(nonClosingProxy);
}
if (super.configurationMetadataStore == null) {
configurationMetadataStore(nonClosingProxy);
}
} catch (MetadataStoreException e) {
throw new RuntimeException(e);
}
}
}
}
private MetadataStoreExtended createMockZookeeperMetadataStore(MockZooKeeper mockZooKeeper,
String metadataStoreName) {
// provide a unique session id for each instance
MockZooKeeperSession mockZooKeeperSession = MockZooKeeperSession.newInstance(mockZooKeeper, false);
registerCloseable(() -> {
mockZooKeeperSession.close();
resetSpyOrMock(mockZooKeeperSession);
});
ZKMetadataStore zkMetadataStore = new ZKMetadataStore(mockZooKeeperSession,
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
registerCloseable(() -> {
zkMetadataStore.close();
resetSpyOrMock(zkMetadataStore);
});
MetadataStoreExtended nonClosingProxy =
NonClosingProxyHandler.createNonClosingProxy(zkMetadataStore, MetadataStoreExtended.class);
return nonClosingProxy;
}
protected abstract void initializePulsarServices(SpyConfig spyConfig, Builder builder);
}
static void resetSpyOrMock(Object object) {
if (MockUtil.isMock(object)) {
Mockito.reset(object);
}
}
/**
* Customizations for a builder for creating a PulsarTestContext that is "startable".
*/
static class StartableCustomBuilder extends AbstractCustomBuilder {
StartableCustomBuilder() {
super(true);
}
@Override
public Builder managedLedgerClientFactory(ManagedLedgerStorage managedLedgerClientFactory) {
throw new IllegalStateException("Cannot set managedLedgerClientFactory when startable.");
}
@Override
public Builder pulsarResources(PulsarResources pulsarResources) {
throw new IllegalStateException("Cannot set pulsarResources when startable.");
}
@Override
protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
BookKeeperClientFactory bookKeeperClientFactory =
new MockBookKeeperClientFactory(builder.bookKeeperClient);
CompactionServiceFactory compactionServiceFactory = builder.compactionServiceFactory;
if (builder.compactionServiceFactory == null && builder.config.getCompactionServiceFactoryClassName()
.equals(PulsarCompactionServiceFactory.class.getName())) {
compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
}
PulsarService pulsarService = spyConfig.getPulsarService()
.spy(StartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
builder.configurationMetadataStore, compactionServiceFactory,
builder.brokerInterceptor,
bookKeeperClientFactory, builder.brokerServiceCustomizer);
if (compactionServiceFactory != null) {
compactionServiceFactory.initialize(pulsarService);
}
registerCloseable(() -> {
pulsarService.close();
resetSpyOrMock(pulsarService);
});
pulsarService(pulsarService);
}
@Override
protected long resolveBrokerShutdownTimeoutMs() {
// wait 5 seconds for the startable pulsar service to shutdown gracefully
// this reduces the chances that some threads of the PulsarTestsContexts of subsequent test executions
// are running in parallel. It doesn't prevent it completely.
return 5000L;
}
}
/**
* Customizations for a builder for creating a PulsarTestContext that is "non-startable".
*/
static class NonStartableCustomBuilder extends AbstractCustomBuilder {
NonStartableCustomBuilder() {
super(false);
}
@Override
protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
if (builder.managedLedgerClientFactory == null) {
ManagedLedgerFactory mlFactoryMock = Mockito.mock(ManagedLedgerFactory.class);
managedLedgerClientFactory(
PulsarTestContext.createManagedLedgerClientFactory(builder.bookKeeperClient, mlFactoryMock));
}
if (builder.pulsarResources == null) {
SpyConfig.SpyType spyConfigPulsarResources = spyConfig.getPulsarResources();
if (useTestPulsarResources) {
MetadataStore metadataStore = pulsarResourcesMetadataStore;
if (metadataStore == null) {
metadataStore = builder.configurationMetadataStore;
}
NamespaceResources nsr = spyConfigPulsarResources.spy(NamespaceResources.class, metadataStore, 30);
TopicResources tsr = spyConfigPulsarResources.spy(TopicResources.class, metadataStore);
pulsarResources(
spyConfigPulsarResources.spy(
NonStartableTestPulsarService.TestPulsarResources.class, builder.localMetadataStore,
builder.configurationMetadataStore,
tsr, nsr));
} else {
pulsarResources(
spyConfigPulsarResources.spy(PulsarResources.class, builder.localMetadataStore,
builder.configurationMetadataStore));
}
}
BookKeeperClientFactory bookKeeperClientFactory =
new MockBookKeeperClientFactory(builder.bookKeeperClient);
CompactionServiceFactory compactionServiceFactory = builder.compactionServiceFactory;
if (builder.compactionServiceFactory == null && builder.config.getCompactionServiceFactoryClassName()
.equals(PulsarCompactionServiceFactory.class.getName())) {
compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
}
PulsarService pulsarService = spyConfig.getPulsarService()
.spy(NonStartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
builder.configurationMetadataStore, compactionServiceFactory,
builder.brokerInterceptor,
bookKeeperClientFactory, builder.pulsarResources,
builder.managedLedgerClientFactory, builder.brokerServiceCustomizer);
if (compactionServiceFactory != null) {
compactionServiceFactory.initialize(pulsarService);
}
registerCloseable(() -> {
pulsarService.close();
resetSpyOrMock(pulsarService);
});
pulsarService(pulsarService);
}
}
@NotNull
private static ManagedLedgerStorage createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
ManagedLedgerFactory managedLedgerFactory) {
return new ManagedLedgerStorage() {
@Override
public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup)
throws Exception {
}
@Override
public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerFactory;
}
@Override
public StatsProvider getStatsProvider() {
return new NullStatsProvider();
}
@Override
public BookKeeper getBookKeeperClient() {
return bookKeeperClient;
}
@Override
public void close() throws IOException {
}
};
}
}