blob: 85fde7197973bcf255593544e9fcf67cd89bcb81 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.backends.rabbitmq;
import static org.apache.james.backends.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEMENT_CREDENTIAL;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.function.Consumer;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
import com.github.fge.lambdas.consumers.ThrowingConsumer;
import reactor.rabbitmq.Sender;
public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback, ParameterResolver {
private static final Consumer<DockerRabbitMQ> DO_NOTHING = dockerRabbitMQ -> { };
public enum DockerRestartPolicy {
PER_TEST(DockerRabbitMQ::start, DockerRabbitMQ::start, DockerRabbitMQ::stop, DockerRabbitMQ::stop),
PER_CLASS(DockerRabbitMQ::start, DO_NOTHING, DO_NOTHING, DockerRabbitMQ::stop),
NEVER(DockerRabbitMQ::start, DO_NOTHING, DO_NOTHING, DO_NOTHING);
private final Consumer<DockerRabbitMQ> beforeAllCallback;
private final Consumer<DockerRabbitMQ> beforeEachCallback;
private final Consumer<DockerRabbitMQ> afterEachCallback;
private final Consumer<DockerRabbitMQ> afterAllCallback;
DockerRestartPolicy(Consumer<DockerRabbitMQ> beforeAllCallback,
Consumer<DockerRabbitMQ> beforeEachCallback,
Consumer<DockerRabbitMQ> afterEachCallback,
Consumer<DockerRabbitMQ> afterAllCallback) {
this.beforeAllCallback = beforeAllCallback;
this.beforeEachCallback = beforeEachCallback;
this.afterEachCallback = afterEachCallback;
this.afterAllCallback = afterAllCallback;
}
public void beforeAll(DockerRabbitMQ dockerRabbitMQ) {
beforeAllCallback.accept(dockerRabbitMQ);
}
public void afterAll(DockerRabbitMQ dockerRabbitMQ) {
afterAllCallback.accept(dockerRabbitMQ);
}
public void afterEach(DockerRabbitMQ dockerRabbitMQ) {
afterEachCallback.accept(dockerRabbitMQ);
}
public void beforeEach(DockerRabbitMQ dockerRabbitMQ) {
beforeEachCallback.accept(dockerRabbitMQ);
}
}
public enum IsolationPolicy {
WEAK(any -> {}),
STRONG(DockerRabbitMQ::reset);
private final ThrowingConsumer<DockerRabbitMQ> isolationCall;
IsolationPolicy(ThrowingConsumer<DockerRabbitMQ> isolationCall) {
this.isolationCall = isolationCall;
}
void enforceIsolation(DockerRabbitMQ container) {
isolationCall.accept(container);
}
}
@FunctionalInterface
public interface RequireRestartPolicy {
RequireIsolationPolicy restartPolicy(DockerRestartPolicy dockerRestartPolicy);
}
@FunctionalInterface
public interface RequireIsolationPolicy {
RabbitMQExtension isolationPolicy(IsolationPolicy isolationPolicy);
}
public static RequireIsolationPolicy singletonRabbitMQ() {
return isolationPolicy -> new RabbitMQExtension(DockerRabbitMQSingleton.SINGLETON, DockerRestartPolicy.NEVER, isolationPolicy);
}
public static RequireRestartPolicy defaultRabbitMQ() {
return dockerRabbitMQ(DockerRabbitMQ.withoutCookie());
}
public static RequireRestartPolicy dockerRabbitMQ(DockerRabbitMQ dockerRabbitMQ) {
return dockerRestartPolicy -> isolationPolicy -> new RabbitMQExtension(dockerRabbitMQ, dockerRestartPolicy, isolationPolicy);
}
private final DockerRabbitMQ rabbitMQ;
private final DockerRestartPolicy dockerRestartPolicy;
private final IsolationPolicy isolationPolicy;
private ReactorRabbitMQChannelPool channelPool;
private SimpleConnectionPool connectionPool;
public RabbitMQExtension(DockerRabbitMQ rabbitMQ,
DockerRestartPolicy dockerRestartPolicy, IsolationPolicy isolationPolicy) {
this.rabbitMQ = rabbitMQ;
this.dockerRestartPolicy = dockerRestartPolicy;
this.isolationPolicy = isolationPolicy;
}
@Override
public void beforeAll(ExtensionContext context) {
dockerRestartPolicy.beforeAll(rabbitMQ);
}
@Override
public void beforeEach(ExtensionContext extensionContext) throws Exception {
dockerRestartPolicy.beforeEach(rabbitMQ);
RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory();
connectionPool = new SimpleConnectionPool(connectionFactory,
SimpleConnectionPool.Configuration.builder()
.retries(2)
.initialDelay(Duration.ofMillis(5)));
channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
ReactorRabbitMQChannelPool.Configuration.builder()
.retries(2)
.minBorrowDelay(Duration.ofMillis(5))
.maxChannel(10));
channelPool.start();
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
channelPool.close();
connectionPool.close();
isolationPolicy.enforceIsolation(rabbitMQ);
dockerRestartPolicy.afterEach(rabbitMQ);
}
@Override
public void afterAll(ExtensionContext extensionContext) {
dockerRestartPolicy.afterAll(rabbitMQ);
}
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
return (parameterContext.getParameter().getType() == DockerRabbitMQ.class);
}
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
return rabbitMQ;
}
public ReactorRabbitMQChannelPool getRabbitChannelPool() {
return channelPool;
}
public Sender getSender() {
return channelPool.getSender();
}
public ReceiverProvider getReceiverProvider() {
return channelPool::createReceiver;
}
public SimpleConnectionPool getConnectionPool() {
return connectionPool;
}
public DockerRabbitMQ getRabbitMQ() {
return rabbitMQ;
}
public RabbitMQManagementAPI managementAPI() throws Exception {
return RabbitMQManagementAPI.from(RabbitMQConfiguration.builder()
.amqpUri(rabbitMQ.amqpUri())
.managementUri(rabbitMQ.managementUri())
.managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
.build());
}
private RabbitMQConnectionFactory createRabbitConnectionFactory() throws URISyntaxException {
return rabbitMQ.createRabbitConnectionFactory();
}
}