[FIX] Mime parsing on parallel scheduler

For 20MB + files this takes time.
This approach avoids over-switching by doing it
only after fetching items...
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 5916502..f03f515 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -88,6 +88,8 @@
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 public class StoreMessageIdManager implements MessageIdManager {
 
@@ -172,6 +174,7 @@
 
     @Override
     public Flux<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup fetchGroup, MailboxSession mailboxSession) {
+        // Here
         MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
         MessageMapper.FetchType fetchType = FetchGroupConverter.getFetchType(fetchGroup);
@@ -179,9 +182,17 @@
             .groupBy(MailboxMessage::getMailboxId)
             .filterWhen(groupedFlux -> hasRightsOnMailboxReactive(mailboxSession, Right.Read).apply(groupedFlux.key()), DEFAULT_CONCURRENCY)
             .flatMap(Function.identity(), DEFAULT_CONCURRENCY)
+            .publishOn(forFetchType(fetchType))
             .map(Throwing.function(messageResultConverter(fetchGroup)).sneakyThrow());
     }
 
+    private Scheduler forFetchType(MessageMapper.FetchType fetchType) {
+        if (fetchType == MessageMapper.FetchType.FULL) {
+            return Schedulers.parallel();
+        }
+        return Schedulers.immediate();
+    }
+
     @Override
     public Publisher<ComposedMessageIdWithMetaData> messagesMetadata(Collection<MessageId> ids, MailboxSession session) {
         MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(session);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index d2f5ac7..fc844cd 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -121,6 +121,7 @@
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 /**
@@ -769,11 +770,20 @@
 
     @Override
     public Publisher<MessageResult> getMessagesReactive(MessageRange set, FetchGroup fetchGroup, MailboxSession mailboxSession) {
+        FetchType fetchType = FetchGroupConverter.getFetchType(fetchGroup);
         return Flux.from(mapperFactory.getMessageMapper(mailboxSession)
-            .findInMailboxReactive(mailbox, set, FetchGroupConverter.getFetchType(fetchGroup), -1))
+            .findInMailboxReactive(mailbox, set, fetchType, -1))
+            .publishOn(forFetchType(fetchType))
             .map(Throwing.<MailboxMessage, MessageResult>function(message -> ResultUtils.loadMessageResult(message, fetchGroup)).sneakyThrow());
     }
 
+    private Scheduler forFetchType(MessageMapper.FetchType fetchType) {
+        if (fetchType == MessageMapper.FetchType.FULL) {
+            return Schedulers.parallel();
+        }
+        return Schedulers.immediate();
+    }
+
     @Override
     public Publisher<ComposedMessageIdWithMetaData> listMessagesMetadata(MessageRange set, MailboxSession session) {
         MessageMapper messageMapper = mapperFactory.getMessageMapper(session);