JAMES-3502, add rabbitmq tls support in connection factory
diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml
index 1bfe6ba..99a0507 100644
--- a/backends-common/rabbitmq/pom.xml
+++ b/backends-common/rabbitmq/pom.xml
@@ -94,6 +94,11 @@
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.5</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
index cb45fc7..83dee07 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
@@ -18,27 +18,42 @@
****************************************************************/
package org.apache.james.backends.rabbitmq;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
import java.time.Duration;
+import java.util.Optional;
import javax.inject.Inject;
+import javax.net.ssl.SSLContext;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
+import static org.apache.james.backends.rabbitmq.RabbitMQConfiguration.SSLConfiguration.*;
+
public class RabbitMQConnectionFactory {
+ private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true;
+
private final ConnectionFactory connectionFactory;
private final RabbitMQConfiguration configuration;
@Inject
public RabbitMQConnectionFactory(RabbitMQConfiguration rabbitMQConfiguration) {
- this.connectionFactory = from(rabbitMQConfiguration);
this.configuration = rabbitMQConfiguration;
+ this.connectionFactory = from(rabbitMQConfiguration);
}
private ConnectionFactory from(RabbitMQConfiguration rabbitMQConfiguration) {
@@ -54,18 +69,89 @@
connectionFactory.setUsername(rabbitMQConfiguration.getManagementCredentials().getUser());
connectionFactory.setPassword(String.valueOf(rabbitMQConfiguration.getManagementCredentials().getPassword()));
+ if (configuration.useSsl()) setupSslConfiguration(connectionFactory);
+
return connectionFactory;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ private void setupSslConfiguration(ConnectionFactory connectionFactory) {
+ try {
+ connectionFactory.useSslProtocol(sslContext(configuration));
+ setupHostNameVerification(connectionFactory);
+ } catch (KeyManagementException | NoSuchAlgorithmException | CertificateException | KeyStoreException | IOException | UnrecoverableKeyException e) {
+ throw new RuntimeException("Cannot set SSL options to the connection factory", e);
+ }
+ }
+
+ private SSLContext sslContext(RabbitMQConfiguration configuration) throws KeyManagementException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, UnrecoverableKeyException {
+ SSLContextBuilder sslContextBuilder = new SSLContextBuilder();
+
+ RabbitMQConfiguration.SSLConfiguration sslConfiguration = configuration.getSslConfiguration();
+
+ setupSslValidationStrategy(sslContextBuilder, sslConfiguration);
+
+ setupClientCertificateAuthentication(sslContextBuilder, sslConfiguration);
+
+ return sslContextBuilder.build();
+
+ }
+
+ private void setupClientCertificateAuthentication(SSLContextBuilder sslContextBuilder, RabbitMQConfiguration.SSLConfiguration sslConfiguration) throws NoSuchAlgorithmException, KeyStoreException, UnrecoverableKeyException, CertificateException, IOException {
+ Optional<SSLKeyStore> keyStore = sslConfiguration.getKeyStore();
+
+ if (keyStore.isPresent()) {
+ SSLKeyStore sslKeyStore = keyStore.get();
+
+ sslContextBuilder.loadKeyMaterial(sslKeyStore.getFile(), sslKeyStore.getPassword(), null);
+ }
+ }
+
+ private void setupSslValidationStrategy(SSLContextBuilder sslContextBuilder, RabbitMQConfiguration.SSLConfiguration sslConfiguration) throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException {
+ SSLValidationStrategy strategy = sslConfiguration
+ .getStrategy();
+
+ switch (strategy) {
+ case DEFAULT:
+ break;
+ case IGNORE:
+ sslContextBuilder.loadTrustMaterial(TRUST_ALL);
+ break;
+ case OVERRIDE:
+ applyTrustStore(sslContextBuilder);
+ break;
+ default:
+ throw new NotImplementedException(
+ String.format("unrecognized strategy '%s'", strategy.name()));
+ }
+ }
+
+ private SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder) throws CertificateException, NoSuchAlgorithmException,
+ KeyStoreException, IOException {
+
+ SSLTrustStore trustStore = configuration.getSslConfiguration()
+ .getTrustStore()
+ .orElseThrow(() -> new IllegalStateException("SSLTrustStore cannot to be empty"));
+
+ return sslContextBuilder
+ .loadTrustMaterial(trustStore.getFile(), trustStore.getPassword());
+ }
+
+ private void setupHostNameVerification(ConnectionFactory connectionFactory) {
+ HostNameVerifier hostNameVerifier = configuration.getSslConfiguration()
+ .getHostNameVerifier();
+
+ if (hostNameVerifier == HostNameVerifier.DEFAULT) connectionFactory.enableHostnameVerification();
+ }
+
Connection create() {
return connectionMono().block();
}
Mono<Connection> connectionMono() {
return Mono.fromCallable(connectionFactory::newConnection)
- .retryWhen(Retry.backoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs())).scheduler(Schedulers.elastic()));
+ .retryWhen(Retry.backoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs())).scheduler(Schedulers.elastic()));
}
}