[IMPROVMENT] Reactify XUser authenticsation strategy (#2235)

diff --git a/server/data/data-api/src/main/java/org/apache/james/domainlist/api/DomainList.java b/server/data/data-api/src/main/java/org/apache/james/domainlist/api/DomainList.java
index 29cd268..99c06aa 100644
--- a/server/data/data-api/src/main/java/org/apache/james/domainlist/api/DomainList.java
+++ b/server/data/data-api/src/main/java/org/apache/james/domainlist/api/DomainList.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import org.apache.james.core.Domain;
+import org.reactivestreams.Publisher;
 
 /**
  * This interface should be implemented by services which offer domains for
@@ -43,6 +44,8 @@
      */
     boolean containsDomain(Domain domain) throws DomainListException;
 
+    Publisher<Boolean> containsDomainReactive(Domain domain);
+
     /**
      * Add domain to the service
      * 
diff --git a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
index 943f2ac..5fdf723 100644
--- a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
+++ b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
@@ -26,9 +26,13 @@
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.Username;
 import org.apache.james.user.api.model.User;
+import org.apache.james.util.ReactorUtils;
 import org.reactivestreams.Publisher;
 
+import com.github.fge.lambdas.Throwing;
+
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * Interface for a repository of users. A repository represents a logical
@@ -175,6 +179,12 @@
         }
     }
 
+    default Mono<Void> assertValidReactive(Username username) {
+        return Mono.fromRunnable(Throwing.runnable(() -> assertValid(username)).sneakyThrow())
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+            .then();
+    }
+
     default Publisher<Username> listUsersOfADomainReactive(Domain domain) {
         return Flux.from(listReactive())
             .filter(username -> username.getDomainPart()
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
index 598a4e0..364ca65 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
@@ -35,11 +35,14 @@
 import org.apache.james.dnsservice.api.DNSService;
 import org.apache.james.domainlist.api.DomainListException;
 import org.apache.james.domainlist.lib.AbstractDomainList;
+import org.apache.james.util.ReactorUtils;
 
 import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.cql.PreparedStatement;
 import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraDomainList extends AbstractDomainList {
     private final CassandraAsyncExecutor executor;
     private final PreparedStatement readAllStatement;
@@ -88,6 +91,14 @@
     }
 
     @Override
+    public Mono<Boolean> containsDomainReactive(Domain domain) {
+        return executor.executeSingleRowOptional(readStatement.bind()
+            .set(DOMAIN, domain.asString(), TypeCodecs.TEXT))
+            .handle(ReactorUtils.publishIfPresent())
+            .hasElement();
+    }
+
+    @Override
     public void addDomain(Domain domain) throws DomainListException {
         boolean executed = executor.executeReturnApplied(insertStatement.bind()
                 .set(DOMAIN, domain.asString(), TypeCodecs.TEXT))
diff --git a/server/data/data-ldap/src/main/java/org/apache/james/user/ldap/ReadOnlyUsersLDAPRepository.java b/server/data/data-ldap/src/main/java/org/apache/james/user/ldap/ReadOnlyUsersLDAPRepository.java
index a5abdc8..7837d7a 100644
--- a/server/data/data-ldap/src/main/java/org/apache/james/user/ldap/ReadOnlyUsersLDAPRepository.java
+++ b/server/data/data-ldap/src/main/java/org/apache/james/user/ldap/ReadOnlyUsersLDAPRepository.java
@@ -28,9 +28,12 @@
 import org.apache.james.core.Username;
 import org.apache.james.domainlist.api.DomainList;
 import org.apache.james.lifecycle.api.Configurable;
+import org.apache.james.user.api.InvalidUsernameException;
 import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.user.lib.UsersRepositoryImpl;
 
+import reactor.core.publisher.Mono;
+
 /**
  * <p>
  * This repository implementation serves as a bridge between Apache James and
@@ -220,4 +223,18 @@
             assertDomainPartValid(username);
         }
     }
+
+    @Override
+    public Mono<Void> assertValidReactive(Username username) {
+        try {
+            assertLocalPartValid(username);
+            boolean localPartAsLoginUsernameSupported = ldapConfiguration.getResolveLocalPartAttribute().isPresent();
+            if (!localPartAsLoginUsernameSupported) {
+                return assertDomainPartValidReactive(username);
+            }
+            return Mono.empty();
+        } catch (InvalidUsernameException e) {
+            return Mono.error(e);
+        }
+    }
 }
diff --git a/server/data/data-library/pom.xml b/server/data/data-library/pom.xml
index cfb0487..0eaabea 100644
--- a/server/data/data-library/pom.xml
+++ b/server/data/data-library/pom.xml
@@ -78,6 +78,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git a/server/data/data-library/src/main/java/org/apache/james/domainlist/lib/AbstractDomainList.java b/server/data/data-library/src/main/java/org/apache/james/domainlist/lib/AbstractDomainList.java
index 85ed0ec..131d278 100644
--- a/server/data/data-library/src/main/java/org/apache/james/domainlist/lib/AbstractDomainList.java
+++ b/server/data/data-library/src/main/java/org/apache/james/domainlist/lib/AbstractDomainList.java
@@ -24,6 +24,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Stream;
 
@@ -36,20 +37,22 @@
 import org.apache.james.domainlist.api.DomainList;
 import org.apache.james.domainlist.api.DomainListException;
 import org.apache.james.lifecycle.api.Configurable;
+import org.apache.james.util.ReactorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 
+import reactor.core.publisher.Mono;
+
 /**
  * All implementations of the DomainList interface should extends this abstract
  * class
@@ -68,7 +71,7 @@
 
     private final DNSService dns;
     private final EnvDetector envDetector;
-    private LoadingCache<Domain, Boolean> cache;
+    private AsyncCache<Domain, Boolean> cache;
     private DomainListConfiguration configuration;
     private Domain defaultDomain;
 
@@ -91,14 +94,9 @@
     public void configure(DomainListConfiguration domainListConfiguration) throws ConfigurationException {
         this.configuration = domainListConfiguration;
 
-        this.cache = CacheBuilder.newBuilder()
+        cache = Caffeine.newBuilder()
             .expireAfterWrite(configuration.getCacheExpiracy())
-            .build(new CacheLoader<>() {
-                @Override
-                public Boolean load(Domain key) throws DomainListException {
-                    return containsDomainInternal(key) || detectedDomainsContains(key);
-                }
-            });
+            .buildAsync();
 
         configureDefaultDomain(domainListConfiguration.getDefaultDomain());
 
@@ -171,24 +169,46 @@
     public boolean containsDomain(Domain domain) throws DomainListException {
         if (configuration.isCacheEnabled()) {
             try {
-                return cache.get(domain);
+                return cache.get(domain, (key, executor) -> innerContains(domain).toFuture()).get();
             } catch (ExecutionException e) {
                 if (e.getCause() instanceof DomainListException) {
                     throw (DomainListException) e.getCause();
                 }
                 throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
             }
         } else {
-            boolean internalAnswer = containsDomainInternal(domain);
-            return internalAnswer || detectedDomainsContains(domain);
+            return innerContains(domain).block();
         }
     }
 
-    private boolean detectedDomainsContains(Domain domain) throws DomainListException {
-        if (configuration.isAutoDetect() || configuration.isAutoDetectIp()) {
-            return getDomains().contains(domain);
+    @Override
+    public Mono<Boolean> containsDomainReactive(Domain domain) {
+        if (configuration.isCacheEnabled()) {
+            return Mono.fromFuture(cache.get(domain, (key, executor) -> innerContains(domain).toFuture()));
+        } else {
+            return innerContains(domain);
         }
-        return false;
+    }
+
+    private Mono<Boolean> innerContains(Domain domain) {
+        return containsDomainInternalReactive(domain)
+            .flatMap(containsInternal -> {
+                if (containsInternal) {
+                    return Mono.just(true);
+                }
+                return detectedDomainsContains(domain);
+            });
+    }
+
+    private Mono<Boolean> detectedDomainsContains(Domain domain) {
+        if (configuration.isAutoDetect() || configuration.isAutoDetectIp()) {
+            return Mono.fromCallable(() -> getDomains().contains(domain))
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+        }
+        return Mono.just(false);
     }
 
     @Override
@@ -200,7 +220,7 @@
 
         if (configuration.isCacheEnabled()) {
             domainsWithType.get(DomainType.Internal)
-                .forEach(domain -> cache.put(domain, true));
+                .forEach(domain -> cache.put(domain, CompletableFuture.completedFuture(true)));
         }
 
         if (LOGGER.isDebugEnabled()) {
@@ -314,6 +334,11 @@
 
     protected abstract boolean containsDomainInternal(Domain domain) throws DomainListException;
 
+    protected Mono<Boolean> containsDomainInternalReactive(Domain domain) {
+        return Mono.fromCallable(() -> containsDomainInternal(domain))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+    }
+
     protected abstract void doRemoveDomain(Domain domain) throws DomainListException;
 
 }
diff --git a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
index 871e2f6..e22f459 100644
--- a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
+++ b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
@@ -45,8 +45,11 @@
 import org.reactivestreams.Publisher;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.base.CharMatcher;
 
+import reactor.core.publisher.Mono;
+
 public class UsersRepositoryImpl<T extends UsersDAO> implements UsersRepository, Configurable {
     public static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(UsersRepositoryImpl.class);
     private static String ILLEGAL_USERNAME_CHARACTERS = "\"(),:; <>@[\\]";
@@ -90,6 +93,12 @@
         assertLocalPartValid(username);
     }
 
+    @Override
+    public Mono<Void> assertValidReactive(Username username) {
+        return assertDomainPartValidReactive(username)
+            .then(Mono.fromRunnable(Throwing.runnable(() -> assertLocalPartValid(username)).sneakyThrow()));
+    }
+
     protected void assertDomainPartValid(Username username) throws UsersRepositoryException {
         if (supportVirtualHosting()) {
             // need a @ in the username
@@ -113,6 +122,33 @@
         }
     }
 
+    protected Mono<Void> assertDomainPartValidReactive(Username username) {
+        if (supportVirtualHosting()) {
+            // need a @ in the username
+            if (!username.hasDomainPart()) {
+                return Mono.error(new InvalidUsernameException("Given Username needs to contain a @domainpart"));
+            } else {
+                Domain domain = username.getDomainPart().get();
+                return Mono.from(domainList.containsDomainReactive(domain))
+                    .onErrorMap(DomainListException.class, e -> new UsersRepositoryException("Unable to query DomainList", e))
+                    .handle((result, sink) -> {
+                        if (!result) {
+                            sink.error(new InvalidUsernameException("Domain does not exist in DomainList"));
+                        } else {
+                            sink.complete();
+                        }
+                    });
+            }
+        } else {
+            // @ only allowed when virtualhosting is supported
+            if (username.hasDomainPart()) {
+                return Mono.error(new InvalidUsernameException("Given Username contains a @domainpart but virtualhosting support is disabled"));
+            } else {
+                return Mono.empty();
+            }
+        }
+    }
+
     protected void assertLocalPartValid(Username username) throws InvalidUsernameException {
         boolean isValid = CharMatcher.anyOf(ILLEGAL_USERNAME_CHARACTERS)
             .matchesNoneOf(username.getLocalPart());
diff --git a/server/data/data-library/src/test/java/org/apache/james/domainlist/api/mock/SimpleDomainList.java b/server/data/data-library/src/test/java/org/apache/james/domainlist/api/mock/SimpleDomainList.java
index f754bd3..e35e778 100644
--- a/server/data/data-library/src/test/java/org/apache/james/domainlist/api/mock/SimpleDomainList.java
+++ b/server/data/data-library/src/test/java/org/apache/james/domainlist/api/mock/SimpleDomainList.java
@@ -24,9 +24,12 @@
 import org.apache.james.core.Domain;
 import org.apache.james.domainlist.api.DomainList;
 import org.apache.james.domainlist.api.DomainListException;
+import org.reactivestreams.Publisher;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+
 /**
  * Simplest implementation for ManageableDomainList
  */
@@ -40,6 +43,11 @@
     }
 
     @Override
+    public Publisher<Boolean> containsDomainReactive(Domain domain) {
+        return Mono.fromCallable(() -> containsDomain(domain));
+    }
+
+    @Override
     public List<Domain> getDomains() {
         return ImmutableList.copyOf(domains);
     }
diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/http/XUserAuthenticationStrategy.java b/server/protocols/jmap/src/main/java/org/apache/james/jmap/http/XUserAuthenticationStrategy.java
index 604ed41..48f91ae 100644
--- a/server/protocols/jmap/src/main/java/org/apache/james/jmap/http/XUserAuthenticationStrategy.java
+++ b/server/protocols/jmap/src/main/java/org/apache/james/jmap/http/XUserAuthenticationStrategy.java
@@ -28,8 +28,6 @@
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
-import org.apache.james.util.ReactorUtils;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -62,14 +60,9 @@
     }
 
     private Mono<MailboxSession> createMailboxSession(Username username) {
-        return Mono.fromCallable(() -> {
-            try {
-                usersRepository.assertValid(username);
-            } catch (UsersRepositoryException e) {
-                throw new UnauthorizedException("Invalid username", e);
-            }
-            return mailboxManager.authenticate(username).withoutDelegation();
-        }).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+        return usersRepository.assertValidReactive(username)
+            .onErrorMap(e -> new UnauthorizedException("Invalid username", e))
+            .then(Mono.fromCallable(() -> mailboxManager.authenticate(username).withoutDelegation()));
     }
 
     @Override