blob: 2896f338e4a1f6861c122ffe4a02d19cf964b961 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.testcontext;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.Mockito.mock;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
/**
* This is an internal class used by {@link PulsarTestContext} as the {@link PulsarService} implementation
* for a "non-startable" PulsarService. Please see {@link PulsarTestContext} for more details.
*/
class NonStartableTestPulsarService extends AbstractTestPulsarService {
private final NamespaceService namespaceService;
public NonStartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration config,
MetadataStoreExtended localMetadataStore,
MetadataStoreExtended configurationMetadataStore,
CompactionServiceFactory compactionServiceFactory,
BrokerInterceptor brokerInterceptor,
BookKeeperClientFactory bookKeeperClientFactory,
PulsarResources pulsarResources,
ManagedLedgerStorage managedLedgerClientFactory,
Function<BrokerService, BrokerService> brokerServiceCustomizer) {
super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactionServiceFactory,
brokerInterceptor, bookKeeperClientFactory);
setPulsarResources(pulsarResources);
setManagedLedgerClientFactory(managedLedgerClientFactory);
try {
setBrokerService(brokerServiceCustomizer.apply(
spyConfig.getBrokerService().spy(TestBrokerService.class, this, getIoEventLoopGroup())));
} catch (Exception e) {
throw new RuntimeException(e);
}
setSchemaRegistryService(spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class));
setClient(mock(PulsarClientImpl.class));
this.namespaceService = mock(NamespaceService.class);
try {
startNamespaceService();
} catch (PulsarServerException e) {
throw new RuntimeException(e);
}
if (config.isTransactionCoordinatorEnabled()) {
try {
setTransactionBufferProvider(TransactionBufferProvider
.newProvider(config.getTransactionBufferProviderClassName()));
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
setTransactionPendingAckStoreProvider(TransactionPendingAckStoreProvider
.newProvider(config.getTransactionPendingAckStoreProviderClassName()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void start() throws PulsarServerException {
throw new UnsupportedOperationException("Cannot start a non-startable TestPulsarService");
}
@Override
public Supplier<NamespaceService> getNamespaceServiceProvider() throws PulsarServerException {
return () -> namespaceService;
}
@Override
public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) throws PulsarClientException {
try {
return (PulsarClientImpl) getClient();
} catch (PulsarServerException e) {
throw new PulsarClientException(e);
}
}
@Override
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return getBrokerService();
}
static class TestBrokerService extends BrokerService {
TestBrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
super(pulsar, eventLoopGroup);
}
@Override
protected CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(TopicName topicName) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
}
static class TestPulsarResources extends PulsarResources {
private final TopicResources topicResources;
private final NamespaceResources namespaceResources;
public TestPulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
TopicResources topicResources, NamespaceResources namespaceResources) {
super(localMetadataStore, configurationMetadataStore);
this.topicResources = topicResources;
this.namespaceResources = namespaceResources;
}
@Override
public TopicResources getTopicResources() {
return topicResources;
}
@Override
public NamespaceResources getNamespaceResources() {
return namespaceResources;
}
}
}