blob: 695c38f8e4b8dad054fe05e114c8d77244db0dd9 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailbox.cassandra.mail;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.mail.Flags;
import javax.mail.Flags.Flag;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.mailbox.ApplicableFlagBuilder;
import org.apache.james.mailbox.FlagsBuilder;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.utils.FlagsUpdateStageResult;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxCounters;
import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.FlagsUpdateCalculator;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.util.OptionalUtils;
import org.apache.james.util.ReactorUtils;
import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class CassandraMessageMapper implements MessageMapper {
public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageMapper.class);
private final CassandraModSeqProvider modSeqProvider;
private final CassandraUidProvider uidProvider;
private final CassandraMessageDAO messageDAO;
private final CassandraMessageIdDAO messageIdDAO;
private final CassandraMessageIdToImapUidDAO imapUidDAO;
private final CassandraMailboxCounterDAO mailboxCounterDAO;
private final CassandraMailboxRecentsDAO mailboxRecentDAO;
private final CassandraApplicableFlagDAO applicableFlagDAO;
private final CassandraIndexTableHandler indexTableHandler;
private final CassandraFirstUnseenDAO firstUnseenDAO;
private final AttachmentLoader attachmentLoader;
private final CassandraDeletedMessageDAO deletedMessageDAO;
private final CassandraConfiguration cassandraConfiguration;
public CassandraMessageMapper(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider,
CassandraAttachmentMapper attachmentMapper,
CassandraMessageDAO messageDAO, CassandraMessageIdDAO messageIdDAO,
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO,
CassandraMailboxRecentsDAO mailboxRecentDAO, CassandraApplicableFlagDAO applicableFlagDAO,
CassandraIndexTableHandler indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO,
CassandraDeletedMessageDAO deletedMessageDAO, CassandraConfiguration cassandraConfiguration) {
this.uidProvider = uidProvider;
this.modSeqProvider = modSeqProvider;
this.messageDAO = messageDAO;
this.messageIdDAO = messageIdDAO;
this.imapUidDAO = imapUidDAO;
this.mailboxCounterDAO = mailboxCounterDAO;
this.mailboxRecentDAO = mailboxRecentDAO;
this.indexTableHandler = indexTableHandler;
this.firstUnseenDAO = firstUnseenDAO;
this.attachmentLoader = new AttachmentLoader(attachmentMapper);
this.applicableFlagDAO = applicableFlagDAO;
this.deletedMessageDAO = deletedMessageDAO;
this.cassandraConfiguration = cassandraConfiguration;
}
@Override
public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) {
CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
return messageIdDAO.retrieveMessages(cassandraId, MessageRange.all())
.map(metaData -> metaData.getComposedMessageId().getUid())
.toIterable()
.iterator();
}
@Override
public long countMessagesInMailbox(Mailbox mailbox) {
return getMailboxCounters(mailbox).getCount();
}
@Override
public long countUnseenMessagesInMailbox(Mailbox mailbox) {
return getMailboxCounters(mailbox).getUnseen();
}
@Override
public MailboxCounters getMailboxCounters(Mailbox mailbox) {
return getMailboxCountersAsMono(mailbox).block();
}
private Mono<MailboxCounters> getMailboxCountersAsMono(Mailbox mailbox) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return mailboxCounterDAO.retrieveMailboxCounters(mailboxId)
.defaultIfEmpty(MailboxCounters.builder()
.mailboxId(mailboxId)
.count(0)
.unseen(0)
.build());
}
@Override
public List<MailboxCounters> getMailboxCounters(Collection<Mailbox> mailboxes) {
return Flux.fromIterable(mailboxes)
.publishOn(Schedulers.elastic())
.concatMap(this::getMailboxCountersAsMono)
.toStream()
.collect(Guavate.toImmutableList());
}
@Override
public void delete(Mailbox mailbox, MailboxMessage message) {
deleteAsFuture(message)
.block();
}
private Mono<Void> deleteAsFuture(MailboxMessage message) {
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = message.getComposedMessageIdWithMetaData();
return deleteUsingMailboxId(composedMessageIdWithMetaData);
}
private Mono<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
CassandraMessageId messageId = (CassandraMessageId) composedMessageId.getMessageId();
CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
MessageUid uid = composedMessageId.getUid();
return Flux.merge(
imapUidDAO.delete(messageId, mailboxId),
messageIdDAO.delete(mailboxId, uid))
.then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
}
@Override
public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max))
.map(MailboxMessage.class::cast)
.sort(Comparator.comparing(MailboxMessage::getUid))
.toIterable()
.iterator();
}
private List<ComposedMessageIdWithMetaData> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) {
return messageIdDAO.retrieveMessages(mailboxId, messageRange)
.collect(Guavate.toImmutableList())
.block();
}
private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
return messageDAO.retrieveMessages(messageIds, fetchType, limit)
.filter(CassandraMessageDAO.MessageResult::isFound)
.map(CassandraMessageDAO.MessageResult::message)
.flatMap(stream -> attachmentLoader.addAttachmentToMessage(stream, fetchType));
}
@Override
public List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return mailboxRecentDAO.getRecentMessageUidsInMailbox(mailboxId)
.collectList()
.block();
}
@Override
public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return firstUnseenDAO.retrieveFirstUnread(mailboxId)
.blockOptional()
.orElse(null);
}
@Override
public List<MessageUid> retrieveMessagesMarkedForDeletion(Mailbox mailbox, MessageRange messageRange) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange)
.collect(Guavate.toImmutableList())
.block();
}
@Override
public Map<MessageUid, MessageMetaData> deleteMessages(Mailbox mailbox, List<MessageUid> uids) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return Flux.fromStream(uids.stream())
.flatMap(messageUid -> expungeOne(mailboxId, messageUid), cassandraConfiguration.getExpungeChunkSize())
.collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData))
.block();
}
private Flux<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) {
return retrieveComposedId(mailboxId, messageUid)
.flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata))
.flatMapMany(idWithMetadata ->
messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))
.filter(CassandraMessageDAO.MessageResult::isFound)
.map(CassandraMessageDAO.MessageResult::message)
.map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()));
}
private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId mailboxId, MessageUid uid) {
return messageIdDAO.retrieve(mailboxId, uid)
.doOnNext(optional -> OptionalUtils.executeIfEmpty(optional,
() -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid)))
.handle((t, sink) -> t.ifPresent(sink::next));
}
@Override
public MessageMetaData move(Mailbox destinationMailbox, MailboxMessage original) throws MailboxException {
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = original.getComposedMessageIdWithMetaData();
MessageMetaData messageMetaData = copy(destinationMailbox, original);
deleteUsingMailboxId(composedMessageIdWithMetaData).block();
return messageMetaData;
}
@Override
public void endRequest() {
// Do nothing
}
@Override
public ModSeq getHighestModSeq(Mailbox mailbox) throws MailboxException {
return modSeqProvider.highestModSeq(mailbox);
}
@Override
public MessageMetaData add(Mailbox mailbox, MailboxMessage message) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
save(mailbox, addUidAndModseq(message, mailboxId))
.thenEmpty(indexTableHandler.updateIndexOnAdd(message, mailboxId))
.block();
return message.metaData();
}
private MailboxMessage addUidAndModseq(MailboxMessage message, CassandraId mailboxId) throws MailboxException {
Mono<MessageUid> messageUidMono = uidProvider
.nextUid(mailboxId)
.switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId)
.switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
try {
Mono.zip(messageUidMono, nextModSeqMono)
.doOnNext(tuple -> {
message.setUid(tuple.getT1());
message.setModSeq(tuple.getT2());
})
.block();
} catch (RuntimeException e) {
if (e.getCause() instanceof MailboxException) {
throw (MailboxException)e.getCause();
}
throw e;
}
return message;
}
@Override
public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange range) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
Flux<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, range);
FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator).block();
FlagsUpdateStageResult finalResult = handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult);
if (finalResult.containsFailedResults()) {
LOGGER.error("Can not update following UIDs {} for mailbox {}", finalResult.getFailed(), mailboxId.asUuid());
}
return finalResult.getSucceeded().iterator();
}
private FlagsUpdateStageResult handleUpdatesStagedRetry(CassandraId mailboxId, FlagsUpdateCalculator flagUpdateCalculator, FlagsUpdateStageResult firstResult) {
FlagsUpdateStageResult globalResult = firstResult;
int retryCount = 0;
while (retryCount < cassandraConfiguration.getFlagsUpdateMessageMaxRetry() && globalResult.containsFailedResults()) {
retryCount++;
FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, flagUpdateCalculator, globalResult.getFailed()).block();
globalResult = globalResult.keepSucceded().merge(stageResult);
}
return globalResult;
}
private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
if (!failed.isEmpty()) {
Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
.flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid))
.handle((t, sink) -> t.ifPresent(sink::next));
return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
} else {
return Mono.empty();
}
}
private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
Mono<ModSeq> newModSeq = computeNewModSeq(mailboxId);
return toBeUpdated
.concatMap(metadata -> newModSeq.flatMap(modSeq -> tryFlagsUpdate(flagsUpdateCalculator, modSeq, metadata)))
.reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge)
.flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result));
}
private Mono<ModSeq> computeNewModSeq(CassandraId mailboxId) {
return modSeqProvider.nextModSeq(mailboxId)
.switchIfEmpty(ReactorUtils.executeAndEmpty(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())));
}
private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
return Flux.fromIterable(result.getSucceeded())
.flatMap(Throwing
.function((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags))
.fallbackTo(failedIndex -> {
LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedIndex.getUid());
return Mono.empty();
}))
.then(Mono.just(result));
}
@Override
public <T> T execute(Transaction<T> transaction) throws MailboxException {
return transaction.run();
}
@Override
public MessageMetaData copy(Mailbox mailbox, MailboxMessage original) throws MailboxException {
original.setFlags(new FlagsBuilder().add(original.createFlags()).add(Flag.RECENT).build());
return setInMailbox(mailbox, original);
}
@Override
public Optional<MessageUid> getLastUid(Mailbox mailbox) throws MailboxException {
return uidProvider.lastUid(mailbox);
}
@Override
public Flags getApplicableFlag(Mailbox mailbox) {
return ApplicableFlagBuilder.builder()
.add(applicableFlagDAO.retrieveApplicableFlag((CassandraId) mailbox.getMailboxId())
.defaultIfEmpty(new Flags())
.block())
.build();
}
private MessageMetaData setInMailbox(Mailbox mailbox, MailboxMessage message) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
insertIds(addUidAndModseq(message, mailboxId), mailboxId)
.thenEmpty(indexTableHandler.updateIndexOnAdd(message, mailboxId))
.block();
return message.metaData();
}
private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return messageDAO.save(message)
.thenEmpty(insertIds(message, mailboxId));
}
private Mono<Void> insertIds(MailboxMessage message, CassandraId mailboxId) {
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder()
.composedMessageId(new ComposedMessageId(mailboxId, message.getMessageId(), message.getUid()))
.flags(message.createFlags())
.modSeq(message.getModSeq())
.build();
return Flux.merge(
messageIdDAO.insert(composedMessageIdWithMetaData),
imapUidDAO.insert(composedMessageIdWithMetaData))
.then();
}
private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, ModSeq newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
Flags oldFlags = oldMetaData.getFlags();
Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);
if (identicalFlags(oldFlags, newFlags)) {
return Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder()
.uid(oldMetaData.getComposedMessageId().getUid())
.modSeq(oldMetaData.getModSeq())
.oldFlags(oldFlags)
.newFlags(newFlags)
.build()));
}
return updateFlags(oldMetaData, newFlags, newModSeq)
.map(success -> {
if (success) {
return FlagsUpdateStageResult.success(UpdatedFlags.builder()
.uid(oldMetaData.getComposedMessageId().getUid())
.modSeq(newModSeq)
.oldFlags(oldFlags)
.newFlags(newFlags)
.build());
} else {
return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId().getUid());
}
});
}
private boolean identicalFlags(Flags oldFlags, Flags newFlags) {
return oldFlags.equals(newFlags);
}
private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, ModSeq newModSeq) {
ComposedMessageIdWithMetaData newMetadata = ComposedMessageIdWithMetaData.builder()
.composedMessageId(oldMetadata.getComposedMessageId())
.modSeq(newModSeq)
.flags(newFlags)
.build();
return imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq())
.flatMap(success -> {
if (success) {
return messageIdDAO.updateMetadata(newMetadata).thenReturn(true);
} else {
return Mono.just(false);
}
});
}
}