blob: 2ea9f167a9c75174b0e6835a28c16a259998834c [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.webadmin.data.jmap;
import static org.apache.james.mailbox.MailboxManager.MailboxSearchFetchType.Minimal;
import java.io.IOException;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import org.apache.james.jmap.api.projections.EmailQueryView;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.FetchGroup;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxMetaData;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.model.MessageResult;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mime4j.dom.Message;
import org.apache.james.mime4j.stream.MimeConfig;
import org.apache.james.task.Task;
import org.apache.james.task.Task.Result;
import org.apache.james.user.api.UsersRepository;
import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.ReactorUtils;
import org.apache.james.util.streams.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class EmailQueryViewPopulator {
private static final Logger LOGGER = LoggerFactory.getLogger(EmailQueryViewPopulator.class);
private static final Duration PERIOD = Duration.ofSeconds(1);
public static final int USER_CONCURRENCY = 1;
public static final int MAILBOX_CONCURRENCY = 1;
static class Progress {
private final AtomicLong processedUserCount;
private final AtomicLong processedMessageCount;
private final AtomicLong failedUserCount;
private final AtomicLong failedMessageCount;
Progress() {
failedUserCount = new AtomicLong();
processedMessageCount = new AtomicLong();
processedUserCount = new AtomicLong();
failedMessageCount = new AtomicLong();
}
private void incrementProcessedUserCount() {
processedUserCount.incrementAndGet();
}
private void incrementProcessedMessageCount() {
processedMessageCount.incrementAndGet();
}
private void incrementFailedUserCount() {
failedUserCount.incrementAndGet();
}
private void incrementFailedMessageCount() {
failedMessageCount.incrementAndGet();
}
long getProcessedUserCount() {
return processedUserCount.get();
}
long getProcessedMessageCount() {
return processedMessageCount.get();
}
long getFailedUserCount() {
return failedUserCount.get();
}
long getFailedMessageCount() {
return failedMessageCount.get();
}
}
private final UsersRepository usersRepository;
private final MailboxManager mailboxManager;
private final EmailQueryView emailQueryView;
@Inject
EmailQueryViewPopulator(UsersRepository usersRepository,
MailboxManager mailboxManager,
EmailQueryView emailQueryView) {
this.usersRepository = usersRepository;
this.mailboxManager = mailboxManager;
this.emailQueryView = emailQueryView;
}
Mono<Result> populateView(Progress progress, RunningOptions runningOptions) {
return correctProjection(listAllMailboxMessages(progress), runningOptions, progress);
}
private Flux<MessageResult> listAllMailboxMessages(Progress progress) {
try {
return Iterators.toFlux(usersRepository.list())
.map(mailboxManager::createSystemSession)
.doOnNext(any -> progress.incrementProcessedUserCount())
.flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY);
} catch (UsersRepositoryException e) {
return Flux.error(e);
}
}
private Flux<MessageResult> listUserMailboxMessages(Progress progress, MailboxSession session) {
return listUsersMailboxes(session)
.flatMap(mailboxMetadata -> retrieveMailbox(session, mailboxMetadata), MAILBOX_CONCURRENCY)
.flatMap(Throwing.function(messageManager -> listAllMessages(messageManager, session)), MAILBOX_CONCURRENCY)
.onErrorResume(MailboxException.class, e -> {
LOGGER.error("JMAP emailQuery view re-computation aborted for {} as we failed listing user mailboxes", session.getUser(), e);
progress.incrementFailedUserCount();
return Flux.empty();
});
}
private Mono<Result> correctProjection(MessageResult messageResult, Progress progress) {
return Mono.fromCallable(() -> {
MailboxId mailboxId = messageResult.getMailboxId();
MessageId messageId = messageResult.getMessageId();
ZonedDateTime receivedAt = ZonedDateTime.ofInstant(messageResult.getInternalDate().toInstant(), ZoneOffset.UTC);
Message mime4JMessage = parseMessage(messageResult);
Date sentAtDate = Optional.ofNullable(mime4JMessage.getDate()).orElse(messageResult.getInternalDate());
ZonedDateTime sentAt = ZonedDateTime.ofInstant(sentAtDate.toInstant(), ZoneOffset.UTC);
return new EmailQueryView.Entry(mailboxId, messageId, sentAt, receivedAt);
})
.flatMap(entry -> emailQueryView.save(entry.getMailboxId(), entry.getSentAt(), entry.getReceivedAt(), entry.getMessageId()))
.thenReturn(Result.COMPLETED)
.doOnSuccess(any -> progress.incrementProcessedMessageCount())
.onErrorResume(e -> {
LOGGER.error("JMAP emailQuery view re-computation aborted for {} - {} - {}",
messageResult.getMailboxId(),
messageResult.getMessageId(),
messageResult.getUid(), e);
progress.incrementFailedMessageCount();
return Mono.just(Result.PARTIAL);
});
}
private Mono<Result> correctProjection(Flux<MessageResult> entries, RunningOptions runningOptions, Progress progress) {
return entries.transform(ReactorUtils.<MessageResult, Result>throttle()
.elements(runningOptions.getMessagesPerSecond())
.per(PERIOD)
.forOperation(entry -> correctProjection(entry, progress)))
.reduce(Task::combine)
.switchIfEmpty(Mono.just(Result.COMPLETED));
}
private Flux<MailboxMetaData> listUsersMailboxes(MailboxSession session) {
return mailboxManager.search(MailboxQuery.privateMailboxesBuilder(session).build(), Minimal, session);
}
private Mono<MessageManager> retrieveMailbox(MailboxSession session, MailboxMetaData mailboxMetadata) {
return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session))
.subscribeOn(Schedulers.elastic());
}
private Flux<MessageResult> listAllMessages(MessageManager messageManager, MailboxSession session) {
try {
return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.HEADERS, session))
.subscribeOn(Schedulers.elastic());
} catch (MailboxException e) {
return Flux.error(e);
}
}
private Message parseMessage(MessageResult messageResult) throws IOException, MailboxException {
return Message.Builder
.of()
.use(MimeConfig.PERMISSIVE)
.parse(messageResult.getFullContent().getInputStream())
.build();
}
}