blob: 83dee0708957cda7167d02646102293a7673c666 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.backends.rabbitmq;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.Optional;
import javax.inject.Inject;
import javax.net.ssl.SSLContext;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import static org.apache.james.backends.rabbitmq.RabbitMQConfiguration.SSLConfiguration.*;
public class RabbitMQConnectionFactory {
private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true;
private final ConnectionFactory connectionFactory;
private final RabbitMQConfiguration configuration;
@Inject
public RabbitMQConnectionFactory(RabbitMQConfiguration rabbitMQConfiguration) {
this.configuration = rabbitMQConfiguration;
this.connectionFactory = from(rabbitMQConfiguration);
}
private ConnectionFactory from(RabbitMQConfiguration rabbitMQConfiguration) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(rabbitMQConfiguration.getUri());
connectionFactory.setHandshakeTimeout(rabbitMQConfiguration.getHandshakeTimeoutInMs());
connectionFactory.setShutdownTimeout(rabbitMQConfiguration.getShutdownTimeoutInMs());
connectionFactory.setChannelRpcTimeout(rabbitMQConfiguration.getChannelRpcTimeoutInMs());
connectionFactory.setConnectionTimeout(rabbitMQConfiguration.getConnectionTimeoutInMs());
connectionFactory.setNetworkRecoveryInterval(rabbitMQConfiguration.getNetworkRecoveryIntervalInMs());
connectionFactory.setUsername(rabbitMQConfiguration.getManagementCredentials().getUser());
connectionFactory.setPassword(String.valueOf(rabbitMQConfiguration.getManagementCredentials().getPassword()));
if (configuration.useSsl()) setupSslConfiguration(connectionFactory);
return connectionFactory;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void setupSslConfiguration(ConnectionFactory connectionFactory) {
try {
connectionFactory.useSslProtocol(sslContext(configuration));
setupHostNameVerification(connectionFactory);
} catch (KeyManagementException | NoSuchAlgorithmException | CertificateException | KeyStoreException | IOException | UnrecoverableKeyException e) {
throw new RuntimeException("Cannot set SSL options to the connection factory", e);
}
}
private SSLContext sslContext(RabbitMQConfiguration configuration) throws KeyManagementException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, UnrecoverableKeyException {
SSLContextBuilder sslContextBuilder = new SSLContextBuilder();
RabbitMQConfiguration.SSLConfiguration sslConfiguration = configuration.getSslConfiguration();
setupSslValidationStrategy(sslContextBuilder, sslConfiguration);
setupClientCertificateAuthentication(sslContextBuilder, sslConfiguration);
return sslContextBuilder.build();
}
private void setupClientCertificateAuthentication(SSLContextBuilder sslContextBuilder, RabbitMQConfiguration.SSLConfiguration sslConfiguration) throws NoSuchAlgorithmException, KeyStoreException, UnrecoverableKeyException, CertificateException, IOException {
Optional<SSLKeyStore> keyStore = sslConfiguration.getKeyStore();
if (keyStore.isPresent()) {
SSLKeyStore sslKeyStore = keyStore.get();
sslContextBuilder.loadKeyMaterial(sslKeyStore.getFile(), sslKeyStore.getPassword(), null);
}
}
private void setupSslValidationStrategy(SSLContextBuilder sslContextBuilder, RabbitMQConfiguration.SSLConfiguration sslConfiguration) throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException {
SSLValidationStrategy strategy = sslConfiguration
.getStrategy();
switch (strategy) {
case DEFAULT:
break;
case IGNORE:
sslContextBuilder.loadTrustMaterial(TRUST_ALL);
break;
case OVERRIDE:
applyTrustStore(sslContextBuilder);
break;
default:
throw new NotImplementedException(
String.format("unrecognized strategy '%s'", strategy.name()));
}
}
private SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder) throws CertificateException, NoSuchAlgorithmException,
KeyStoreException, IOException {
SSLTrustStore trustStore = configuration.getSslConfiguration()
.getTrustStore()
.orElseThrow(() -> new IllegalStateException("SSLTrustStore cannot to be empty"));
return sslContextBuilder
.loadTrustMaterial(trustStore.getFile(), trustStore.getPassword());
}
private void setupHostNameVerification(ConnectionFactory connectionFactory) {
HostNameVerifier hostNameVerifier = configuration.getSslConfiguration()
.getHostNameVerifier();
if (hostNameVerifier == HostNameVerifier.DEFAULT) connectionFactory.enableHostnameVerification();
}
Connection create() {
return connectionMono().block();
}
Mono<Connection> connectionMono() {
return Mono.fromCallable(connectionFactory::newConnection)
.retryWhen(Retry.backoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs())).scheduler(Schedulers.elastic()));
}
}