blob: f7b78c69c5f8c55bd98bf16e4234e89f129b6835 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.jmap.event;
import static jakarta.mail.Flags.Flag.DELETED;
import static org.apache.james.util.ReactorUtils.publishIfPresent;
import java.io.IOException;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Optional;
import jakarta.inject.Inject;
import org.apache.james.events.Event;
import org.apache.james.events.EventListener.ReactiveGroupEventListener;
import org.apache.james.events.Group;
import org.apache.james.jmap.api.projections.EmailQueryView;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageIdManager;
import org.apache.james.mailbox.Role;
import org.apache.james.mailbox.SessionProvider;
import org.apache.james.mailbox.events.MailboxEvents.Added;
import org.apache.james.mailbox.events.MailboxEvents.Expunged;
import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
import org.apache.james.mailbox.events.MailboxEvents.MailboxDeletion;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.FetchGroup;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.MessageResult;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mime4j.codec.DecodeMonitor;
import org.apache.james.mime4j.dom.Header;
import org.apache.james.mime4j.dom.field.DateTimeField;
import org.apache.james.mime4j.field.DateTimeFieldLenientImpl;
import org.apache.james.mime4j.message.DefaultMessageBuilder;
import org.apache.james.mime4j.stream.MimeConfig;
import org.apache.james.util.FunctionalUtils;
import org.reactivestreams.Publisher;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class PopulateEmailQueryViewListener implements ReactiveGroupEventListener {
public static class PopulateEmailQueryViewListenerGroup extends Group {
}
static final Group GROUP = new PopulateEmailQueryViewListenerGroup();
private static final int CONCURRENCY = 5;
private final MessageIdManager messageIdManager;
private final EmailQueryView view;
private final SessionProvider sessionProvider;
@Inject
public PopulateEmailQueryViewListener(MessageIdManager messageIdManager, EmailQueryView view, SessionProvider sessionProvider) {
this.messageIdManager = messageIdManager;
this.view = view;
this.sessionProvider = sessionProvider;
}
@Override
public Group getDefaultGroup() {
return GROUP;
}
@Override
public boolean isHandling(Event event) {
return event instanceof Added
|| event instanceof FlagsUpdated
|| event instanceof Expunged
|| event instanceof MailboxDeletion;
}
@Override
public Publisher<Void> reactiveEvent(Event event) {
if (event instanceof Added) {
return handleAdded((Added) event);
}
if (event instanceof Expunged) {
return handleExpunged((Expunged) event);
}
if (event instanceof FlagsUpdated) {
return handleFlagsUpdated((FlagsUpdated) event);
}
if (event instanceof MailboxDeletion) {
return handleMailboxDeletion((MailboxDeletion) event);
}
return Mono.empty();
}
private Publisher<Void> handleMailboxDeletion(MailboxDeletion mailboxDeletion) {
return view.delete(mailboxDeletion.getMailboxId());
}
private Publisher<Void> handleExpunged(Expunged expunged) {
return Flux.fromStream(expunged.getUids().stream()
.map(uid -> expunged.getMetaData(uid).getMessageId()))
.concatMap(messageId -> view.delete(expunged.getMailboxId(), messageId))
.then();
}
private Publisher<Void> handleFlagsUpdated(FlagsUpdated flagsUpdated) {
MailboxSession session = sessionProvider.createSystemSession(flagsUpdated.getUsername());
Mono<Void> removeMessagesMarkedAsDeleted = Flux.fromIterable(flagsUpdated.getUpdatedFlags())
.filter(updatedFlags -> updatedFlags.isModifiedToSet(DELETED))
.map(UpdatedFlags::getMessageId)
.handle(publishIfPresent())
.concatMap(messageId -> view.delete(flagsUpdated.getMailboxId(), messageId))
.then();
Mono<Void> addMessagesNoLongerMarkedAsDeleted = Flux.fromIterable(flagsUpdated.getUpdatedFlags())
.filter(updatedFlags -> updatedFlags.isModifiedToUnset(DELETED))
.map(UpdatedFlags::getMessageId)
.handle(publishIfPresent())
.concatMap(messageId ->
Flux.from(messageIdManager.getMessagesReactive(ImmutableList.of(messageId), FetchGroup.HEADERS, session))
.next())
.concatMap(message -> handleAdded(flagsUpdated.getMailboxId(), message))
.then();
return removeMessagesMarkedAsDeleted
.then(addMessagesNoLongerMarkedAsDeleted);
}
private Mono<Void> handleAdded(Added added) {
MailboxSession session = sessionProvider.createSystemSession(added.getUsername());
return Flux.fromStream(added.getUids().stream()
.map(added::getMetaData))
.flatMap(messageMetaData -> handleAdded(added, messageMetaData, session), CONCURRENCY)
.then();
}
private Mono<Void> handleAdded(Added added, MessageMetaData messageMetaData, MailboxSession session) {
MessageId messageId = messageMetaData.getMessageId();
MailboxId mailboxId = added.getMailboxId();
Mono<Void> doHandleAdded = Flux.from(messageIdManager.getMessagesReactive(ImmutableList.of(messageId), FetchGroup.HEADERS, session))
.next()
.filter(message -> !message.getFlags().contains(DELETED))
.flatMap(messageResult -> handleAdded(added.getMailboxId(), messageResult));
if (Role.from(added.getMailboxPath().getName()).equals(Optional.of(Role.OUTBOX))) {
return checkMessageStillInOriginMailbox(messageId, session, mailboxId)
.filter(FunctionalUtils.identityPredicate())
.flatMap(stillInOriginMailbox -> doHandleAdded);
}
return doHandleAdded;
}
private Mono<Boolean> checkMessageStillInOriginMailbox(MessageId messageId, MailboxSession session, MailboxId targetMailboxId) {
return Flux.from(messageIdManager.messageMetadata(messageId, session))
.filter(composedMessageIdWithMetaData -> composedMessageIdWithMetaData.getComposedMessageId().getMailboxId().equals(targetMailboxId))
.hasElements();
}
public Mono<Void> handleAdded(MailboxId mailboxId, MessageResult messageResult) {
ZonedDateTime receivedAt = ZonedDateTime.ofInstant(messageResult.getInternalDate().toInstant(), ZoneOffset.UTC);
return Mono.fromCallable(() -> parseMessage(messageResult))
.map(header -> date(header).orElse(messageResult.getInternalDate()))
.map(date -> ZonedDateTime.ofInstant(date.toInstant(), ZoneOffset.UTC))
.flatMap(sentAt -> view.save(mailboxId, sentAt, receivedAt, messageResult.getMessageId()))
.then();
}
private Header parseMessage(MessageResult messageResult) throws IOException, MailboxException {
DefaultMessageBuilder defaultMessageBuilder = new DefaultMessageBuilder();
defaultMessageBuilder.setMimeEntityConfig(MimeConfig.PERMISSIVE);
return defaultMessageBuilder.parseHeader(messageResult.getFullContent().getInputStream());
}
private Optional<Date> date(Header header) {
return Optional.ofNullable(header.getField("Date"))
.map(field -> DateTimeFieldLenientImpl.PARSER.parse(field, DecodeMonitor.SILENT))
.map(DateTimeField::getDate);
}
}