[FIX] Mime parsing on parallel scheduler
For 20MB + files this takes time.
This approach avoids over-switching by doing it
only after fetching items...
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 5916502..f03f515 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -88,6 +88,8 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
public class StoreMessageIdManager implements MessageIdManager {
@@ -172,6 +174,7 @@
@Override
public Flux<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup fetchGroup, MailboxSession mailboxSession) {
+ // Here
MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
MessageMapper.FetchType fetchType = FetchGroupConverter.getFetchType(fetchGroup);
@@ -179,9 +182,17 @@
.groupBy(MailboxMessage::getMailboxId)
.filterWhen(groupedFlux -> hasRightsOnMailboxReactive(mailboxSession, Right.Read).apply(groupedFlux.key()), DEFAULT_CONCURRENCY)
.flatMap(Function.identity(), DEFAULT_CONCURRENCY)
+ .publishOn(forFetchType(fetchType))
.map(Throwing.function(messageResultConverter(fetchGroup)).sneakyThrow());
}
+ private Scheduler forFetchType(MessageMapper.FetchType fetchType) {
+ if (fetchType == MessageMapper.FetchType.FULL) {
+ return Schedulers.parallel();
+ }
+ return Schedulers.immediate();
+ }
+
@Override
public Publisher<ComposedMessageIdWithMetaData> messagesMetadata(Collection<MessageId> ids, MailboxSession session) {
MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(session);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index d2f5ac7..fc844cd 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -121,6 +121,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
@@ -769,11 +770,20 @@
@Override
public Publisher<MessageResult> getMessagesReactive(MessageRange set, FetchGroup fetchGroup, MailboxSession mailboxSession) {
+ FetchType fetchType = FetchGroupConverter.getFetchType(fetchGroup);
return Flux.from(mapperFactory.getMessageMapper(mailboxSession)
- .findInMailboxReactive(mailbox, set, FetchGroupConverter.getFetchType(fetchGroup), -1))
+ .findInMailboxReactive(mailbox, set, fetchType, -1))
+ .publishOn(forFetchType(fetchType))
.map(Throwing.<MailboxMessage, MessageResult>function(message -> ResultUtils.loadMessageResult(message, fetchGroup)).sneakyThrow());
}
+ private Scheduler forFetchType(MessageMapper.FetchType fetchType) {
+ if (fetchType == MessageMapper.FetchType.FULL) {
+ return Schedulers.parallel();
+ }
+ return Schedulers.immediate();
+ }
+
@Override
public Publisher<ComposedMessageIdWithMetaData> listMessagesMetadata(MessageRange set, MailboxSession session) {
MessageMapper messageMapper = mapperFactory.getMessageMapper(session);