blob: 4e7485030cb135881ab7bc393b299bb9a8535b6e [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.backends.rabbitmq;
import java.io.IOException;
import java.time.Duration;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.BiConsumer;
import javax.annotation.PreDestroy;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.james.lifecycle.api.Startable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownSignalException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ChannelPool;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;
public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private static class ChannelClosedException extends IOException {
ChannelClosedException(String message) {
super(message);
}
}
static class ChannelFactory extends BasePooledObjectFactory<Channel> {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelFactory.class);
private final Mono<Connection> connectionMono;
private final Configuration configuration;
ChannelFactory(Mono<Connection> connectionMono, Configuration configuration) {
this.connectionMono = connectionMono;
this.configuration = configuration;
}
@Override
public Channel create() {
return connectionMono
.flatMap(this::openChannel)
.block();
}
private Mono<Channel> openChannel(Connection connection) {
return Mono.fromCallable(connection::openChannel)
.map(maybeChannel ->
maybeChannel.orElseThrow(() -> new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels")))
.retryWhen(configuration.backoffSpec().scheduler(Schedulers.elastic()))
.doOnError(throwable -> LOGGER.error("error when creating new channel", throwable));
}
@Override
public PooledObject<Channel> wrap(Channel obj) {
return new DefaultPooledObject<>(obj);
}
@Override
public void destroyObject(PooledObject<Channel> pooledObject) throws Exception {
Channel channel = pooledObject.getObject();
if (channel.isOpen()) {
channel.close();
}
}
}
public static class Configuration {
@FunctionalInterface
public interface RequiresRetries {
RequiredMinBorrowDelay retries(int retries);
}
@FunctionalInterface
public interface RequiredMinBorrowDelay {
RequiredMaxChannel minBorrowDelay(Duration minBorrowDelay);
}
@FunctionalInterface
public interface RequiredMaxChannel {
Configuration maxChannel(int maxChannel);
}
public static final Configuration DEFAULT = builder()
.retries(MAX_BORROW_RETRIES)
.minBorrowDelay(MIN_BORROW_DELAY)
.maxChannel(MAX_CHANNELS_NUMBER);
public static RequiresRetries builder() {
return retries -> minBorrowDelay -> maxChannel -> new Configuration(minBorrowDelay, retries, maxChannel);
}
public static Configuration from(org.apache.commons.configuration2.Configuration configuration) {
Duration minBorrowDelay = Optional.ofNullable(configuration.getLong("channel.pool.min.delay.ms", null))
.map(Duration::ofMillis)
.orElse(MIN_BORROW_DELAY);
return builder()
.retries(configuration.getInt("channel.pool.retries", MAX_BORROW_RETRIES))
.minBorrowDelay(minBorrowDelay)
.maxChannel(configuration.getInt("channel.pool.size", MAX_CHANNELS_NUMBER));
}
private final Duration minBorrowDelay;
private final int retries;
private final int maxChannel;
public Configuration(Duration minBorrowDelay, int retries, int maxChannel) {
this.minBorrowDelay = minBorrowDelay;
this.retries = retries;
this.maxChannel = maxChannel;
}
private RetryBackoffSpec backoffSpec() {
return Retry.backoff(retries, minBorrowDelay);
}
public int getMaxChannel() {
return maxChannel;
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(ReactorRabbitMQChannelPool.class);
private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
private static final int MAX_CHANNELS_NUMBER = 3;
private static final int MAX_BORROW_RETRIES = 3;
private static final Duration MIN_BORROW_DELAY = Duration.ofMillis(50);
private final Mono<Connection> connectionMono;
private final GenericObjectPool<Channel> pool;
private final ConcurrentSkipListSet<Channel> borrowedChannels;
private final Configuration configuration;
private Sender sender;
public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, Configuration configuration) {
this.connectionMono = connectionMono;
this.configuration = configuration;
ChannelFactory channelFactory = new ChannelFactory(connectionMono, configuration);
GenericObjectPoolConfig<Channel> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(configuration.getMaxChannel());
this.pool = new GenericObjectPool<>(channelFactory, config);
this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode));
}
public void start() {
sender = createSender();
}
public Sender getSender() {
return sender;
}
public Receiver createReceiver() {
return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
}
@Override
public Mono<? extends Channel> getChannelMono() {
return borrow();
}
private Mono<Channel> borrow() {
return tryBorrowFromPool()
.doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable))
.retryWhen(configuration.backoffSpec().scheduler(Schedulers.elastic()))
.onErrorMap(this::propagateException)
.subscribeOn(Schedulers.elastic())
.doOnNext(borrowedChannels::add);
}
private Mono<Channel> tryBorrowFromPool() {
return Mono.fromCallable(this::borrowFromPool);
}
private Throwable propagateException(Throwable throwable) {
if (throwable instanceof IllegalStateException
&& throwable.getMessage().contains("Retries exhausted")) {
return throwable.getCause();
}
return throwable;
}
private Channel borrowFromPool() throws Exception {
Channel channel = pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS);
if (!channel.isOpen()) {
invalidateObject(channel);
throw new ChannelClosedException("borrowed channel is already closed");
}
return channel;
}
@Override
public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
return (signalType, channel) -> {
borrowedChannels.remove(channel);
if (!channel.isOpen() || signalType != SignalType.ON_COMPLETE) {
invalidateObject(channel);
return;
}
pool.returnObject(channel);
};
}
private Sender createSender() {
return RabbitFlux.createSender(new SenderOptions()
.connectionMono(connectionMono)
.channelPool(this)
.resourceManagementChannelMono(
connectionMono.map(Throwing.function(Connection::createChannel)).cache()));
}
public Mono<Void> createWorkQueue(QueueSpecification queueSpecification, BindingSpecification bindingSpecification) {
Preconditions.checkArgument(queueSpecification.getName() != null, "WorkQueue pattern do not make sense for unnamed queues");
Preconditions.checkArgument(queueSpecification.getName().equals(bindingSpecification.getQueue()),
"Binding needs to be targetting the created queue %s instead of %s",
queueSpecification.getName(), bindingSpecification.getQueue());
return Flux.concat(
Mono.using(this::createSender,
managementSender -> managementSender.declareQueue(queueSpecification),
Sender::close)
.onErrorResume(
e -> e instanceof ShutdownSignalException
&& e.getMessage().contains("reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue"),
e -> {
LOGGER.warn("{} already exists without dead-letter setup. Dead lettered messages to it will be lost. " +
"To solve this, re-create the queue with the x-dead-letter-exchange argument set up.",
queueSpecification.getName());
return Mono.empty();
}),
sender.bind(bindingSpecification))
.then();
}
private void invalidateObject(Channel channel) {
try {
pool.invalidateObject(channel);
if (channel.isOpen()) {
channel.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@PreDestroy
@Override
public void close() {
sender.close();
borrowedChannels.forEach(channel -> getChannelCloseHandler().accept(SignalType.ON_NEXT, channel));
borrowedChannels.clear();
pool.close();
}
public Mono<Boolean> tryChannel() {
return Mono.usingWhen(borrow(),
channel -> Mono.just(channel.isOpen()),
channel -> {
if (channel != null) {
borrowedChannels.remove(channel);
pool.returnObject(channel);
}
return Mono.empty();
})
.onErrorResume(any -> Mono.just(false));
}
}