JAMES-3502, add rabbitmq tls support in connection factory
diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml
index 1bfe6ba..99a0507 100644
--- a/backends-common/rabbitmq/pom.xml
+++ b/backends-common/rabbitmq/pom.xml
@@ -94,6 +94,11 @@
             <artifactId>commons-pool2</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>4.4.5</version>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
             <scope>test</scope>
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
index cb45fc7..83dee07 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
@@ -18,27 +18,42 @@
  ****************************************************************/
 package org.apache.james.backends.rabbitmq;
 
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
 import java.time.Duration;
+import java.util.Optional;
 
 import javax.inject.Inject;
+import javax.net.ssl.SSLContext;
 
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.retry.Retry;
 
+import static org.apache.james.backends.rabbitmq.RabbitMQConfiguration.SSLConfiguration.*;
+
 public class RabbitMQConnectionFactory {
 
+    private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true;
+
     private final ConnectionFactory connectionFactory;
 
     private final RabbitMQConfiguration configuration;
 
     @Inject
     public RabbitMQConnectionFactory(RabbitMQConfiguration rabbitMQConfiguration) {
-        this.connectionFactory = from(rabbitMQConfiguration);
         this.configuration = rabbitMQConfiguration;
+        this.connectionFactory = from(rabbitMQConfiguration);
     }
 
     private ConnectionFactory from(RabbitMQConfiguration rabbitMQConfiguration) {
@@ -54,18 +69,89 @@
             connectionFactory.setUsername(rabbitMQConfiguration.getManagementCredentials().getUser());
             connectionFactory.setPassword(String.valueOf(rabbitMQConfiguration.getManagementCredentials().getPassword()));
 
+            if (configuration.useSsl()) setupSslConfiguration(connectionFactory);
+
             return connectionFactory;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
+    private void setupSslConfiguration(ConnectionFactory connectionFactory) {
+        try {
+            connectionFactory.useSslProtocol(sslContext(configuration));
+            setupHostNameVerification(connectionFactory);
+        } catch (KeyManagementException | NoSuchAlgorithmException | CertificateException | KeyStoreException | IOException | UnrecoverableKeyException e) {
+            throw new RuntimeException("Cannot set SSL options to the connection factory", e);
+        }
+    }
+
+    private SSLContext sslContext(RabbitMQConfiguration configuration) throws KeyManagementException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, UnrecoverableKeyException {
+        SSLContextBuilder sslContextBuilder = new SSLContextBuilder();
+
+        RabbitMQConfiguration.SSLConfiguration sslConfiguration = configuration.getSslConfiguration();
+
+        setupSslValidationStrategy(sslContextBuilder, sslConfiguration);
+
+        setupClientCertificateAuthentication(sslContextBuilder, sslConfiguration);
+
+        return sslContextBuilder.build();
+
+    }
+
+    private void setupClientCertificateAuthentication(SSLContextBuilder sslContextBuilder, RabbitMQConfiguration.SSLConfiguration sslConfiguration) throws NoSuchAlgorithmException, KeyStoreException, UnrecoverableKeyException, CertificateException, IOException {
+        Optional<SSLKeyStore> keyStore = sslConfiguration.getKeyStore();
+
+        if (keyStore.isPresent()) {
+            SSLKeyStore sslKeyStore = keyStore.get();
+
+            sslContextBuilder.loadKeyMaterial(sslKeyStore.getFile(), sslKeyStore.getPassword(), null);
+        }
+    }
+
+    private void setupSslValidationStrategy(SSLContextBuilder sslContextBuilder, RabbitMQConfiguration.SSLConfiguration sslConfiguration) throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException {
+        SSLValidationStrategy strategy = sslConfiguration
+                .getStrategy();
+
+        switch (strategy) {
+            case DEFAULT:
+                break;
+            case IGNORE:
+                sslContextBuilder.loadTrustMaterial(TRUST_ALL);
+                break;
+            case OVERRIDE:
+                applyTrustStore(sslContextBuilder);
+                break;
+            default:
+                throw new NotImplementedException(
+                        String.format("unrecognized strategy '%s'", strategy.name()));
+        }
+    }
+
+    private SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder) throws CertificateException, NoSuchAlgorithmException,
+            KeyStoreException, IOException {
+
+        SSLTrustStore trustStore = configuration.getSslConfiguration()
+                .getTrustStore()
+                .orElseThrow(() -> new IllegalStateException("SSLTrustStore cannot to be empty"));
+
+        return sslContextBuilder
+                .loadTrustMaterial(trustStore.getFile(), trustStore.getPassword());
+    }
+
+    private void setupHostNameVerification(ConnectionFactory connectionFactory) {
+        HostNameVerifier hostNameVerifier = configuration.getSslConfiguration()
+                .getHostNameVerifier();
+
+        if (hostNameVerifier == HostNameVerifier.DEFAULT) connectionFactory.enableHostnameVerification();
+    }
+
     Connection create() {
         return connectionMono().block();
     }
 
     Mono<Connection> connectionMono() {
         return Mono.fromCallable(connectionFactory::newConnection)
-            .retryWhen(Retry.backoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs())).scheduler(Schedulers.elastic()));
+                .retryWhen(Retry.backoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs())).scheduler(Schedulers.elastic()));
     }
 }