JAMES-3440 JMAP tasks integration tests on top of the Distributed Server
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
index 84b1c7e..cd130fe 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
@@ -157,6 +157,64 @@
     }
 
     @Test
+    void recomputeFastViewProjectionItemsShouldComplete(GuiceJamesServer server) throws Exception {
+        server.getProbe(DataProbeImpl.class).addUser(USERNAME, "secret");
+        mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, MailboxConstants.INBOX);
+        mailboxProbe.appendMessage(
+            USERNAME,
+            MailboxPath.inbox(Username.of(USERNAME)),
+            new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes()),
+            new Date(),
+            false,
+            new Flags());
+
+        String taskId = with()
+            .post("/mailboxes?task=recomputeFastViewProjectionItems")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is("RecomputeAllFastViewProjectionItemsTask"))
+            .body("additionalInformation.processedMessageCount", is(1))
+            .body("additionalInformation.failedMessageCount", is(0));
+    }
+
+    @Test
+    void populateEmailQueryViewShouldComplete(GuiceJamesServer server) throws Exception {
+        server.getProbe(DataProbeImpl.class).addUser(USERNAME, "secret");
+        mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, MailboxConstants.INBOX);
+        mailboxProbe.appendMessage(
+            USERNAME,
+            MailboxPath.inbox(Username.of(USERNAME)),
+            new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes()),
+            new Date(),
+            false,
+            new Flags());
+
+        String taskId = with()
+            .post("/mailboxes?task=populateEmailQueryView")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is("PopulateEmailQueryViewTask"))
+            .body("additionalInformation.processedMessageCount", is(1))
+            .body("additionalInformation.failedMessageCount", is(0));
+    }
+
+    @Test
     void deleteMailsFromMailQueueShouldCompleteWhenSenderIsValid() {
         String firstMailQueue = with()
                 .basePath(MailQueueRoutes.BASE_URL)
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
index 2141fbd..2ea9f16 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
@@ -58,6 +58,7 @@
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class EmailQueryViewPopulator {
     private static final Logger LOGGER = LoggerFactory.getLogger(EmailQueryViewPopulator.class);
@@ -188,12 +189,14 @@
     }
 
     private Mono<MessageManager> retrieveMailbox(MailboxSession session, MailboxMetaData mailboxMetadata) {
-        return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session));
+        return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session))
+            .subscribeOn(Schedulers.elastic());
     }
 
     private Flux<MessageResult> listAllMessages(MessageManager messageManager, MailboxSession session) {
         try {
-            return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.HEADERS, session));
+            return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.HEADERS, session))
+                .subscribeOn(Schedulers.elastic());
         } catch (MailboxException e) {
             return Flux.error(e);
         }
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index ae8df05..da0a6d7 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -56,6 +56,7 @@
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class MessageFastViewProjectionCorrector {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageFastViewProjectionCorrector.class);
@@ -210,7 +211,8 @@
     }
 
     private Mono<MessageManager> retrieveMailbox(MailboxSession session, MailboxMetaData mailboxMetadata) {
-        return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session));
+        return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session))
+            .subscribeOn(Schedulers.elastic());
     }
 
     private Flux<ComposedMessageIdWithMetaData> listAllMailboxMessages(MessageManager messageManager, MailboxSession session) {
@@ -220,6 +222,7 @@
     private Mono<MessageResult> retrieveContent(MessageManager messageManager, MailboxSession session, MessageUid uid) {
         try {
             return Iterators.toFlux(messageManager.getMessages(MessageRange.one(uid), FetchGroup.FULL_CONTENT, session))
+                .subscribeOn(Schedulers.elastic())
                 .next();
         } catch (MailboxException e) {
             return Mono.error(e);