blob: 63379ad65357388b11841c3134e404e58b46f3d0 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
package org.apache.james.backends.rabbitmq;
import static org.apache.james.util.ReactorUtils.publishIfPresent;
import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.james.lifecycle.api.Startable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.rabbitmq.client.Connection;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
public class SimpleConnectionPool implements AutoCloseable, Startable {
public static final Logger LOGGER = LoggerFactory.getLogger(SimpleConnectionPool.class);
public static class Configuration {
public interface RequiresRetries {
RequiresInitialDelay retries(int retries);
public interface RequiresInitialDelay {
Configuration initialDelay(Duration minBorrowDelay);
public static final Configuration DEFAULT = builder()
public static RequiresRetries builder() {
return retries -> initialDelay -> new Configuration(retries, initialDelay);
public static Configuration from(org.apache.commons.configuration2.Configuration configuration) {
return builder()
.retries(configuration.getInt("connection.pool.retries", 10))
.initialDelay(Duration.ofMillis(configuration.getLong("", 100)));
private final int numRetries;
private final Duration initialDelay;
public Configuration(int numRetries, Duration initialDelay) {
this.numRetries = numRetries;
this.initialDelay = initialDelay;
public int getNumRetries() {
return numRetries;
public Duration getInitialDelay() {
return initialDelay;
public interface ReconnectionHandler {
Publisher<Void> handleReconnection(Connection connection);
private final AtomicReference<Connection> connectionReference;
private final RabbitMQConnectionFactory connectionFactory;
private final Configuration configuration;
private Set<ReconnectionHandler> reconnectionHandlers;
public SimpleConnectionPool(RabbitMQConnectionFactory factory, Configuration configuration) {
this.connectionFactory = factory;
this.reconnectionHandlers = new HashSet<>();
this.configuration = configuration;
this.connectionReference = new AtomicReference<>();
public void init(Set<ReconnectionHandler> reconnectionHandlers) {
this.reconnectionHandlers = reconnectionHandlers;
public void close() {
public Mono<Connection> getResilientConnection() {
return Mono.defer(this::getOpenConnection)
.retryWhen(Retry.backoff(configuration.getNumRetries(), configuration.getInitialDelay()).scheduler(Schedulers.boundedElastic()));
private Mono<Connection> getOpenConnection() {
Connection previous = connectionReference.get();
Connection current = Optional.ofNullable(previous)
boolean updated = connectionReference.compareAndSet(previous, current);
if (updated) {
if (previous != null && previous != current) {
LOGGER.warn("Replacing current RabbitMQ connection...");
return Flux.fromIterable(reconnectionHandlers)
.concatMap(handler -> handler.handleReconnection(current))
return Mono.just(current);
} else {
try {
} catch (IOException e) {
//error below
return Mono.error(new RuntimeException("unable to create and register a new Connection"));
public Mono<Boolean> tryConnection() {
return getOpenConnection()
.onErrorResume(any -> Mono.just(false));
public Mono<RabbitMQServerVersion> version() {
return getOpenConnection()
.map(serverProperties -> Optional.ofNullable(serverProperties.get("version")))
.onErrorResume(any -> Mono.empty());