blob: e5db7eafb1b6a156730ec47df1bbc023232b0339 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailbox.cassandra.mail;
import static org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration.ConsistencyChoice.STRONG;
import static org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration.ConsistencyChoice.WEAK;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import javax.mail.Flags;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration.ConsistencyChoice;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.exception.MailboxNotFoundException;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.FlagsUpdateCalculator;
import org.apache.james.mailbox.store.MailboxReactorUtils;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.MessageIdMapper;
import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.util.FunctionalUtils;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.Multimap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
public class CassandraMessageIdMapper implements MessageIdMapper {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class);
private static final int MAX_RETRY = 5;
private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(10);
private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(1000);
private final MailboxMapper mailboxMapper;
private final CassandraMailboxDAO mailboxDAO;
private final CassandraMessageIdToImapUidDAO imapUidDAO;
private final CassandraMessageIdDAO messageIdDAO;
private final CassandraMessageDAO messageDAO;
private final CassandraMessageDAOV3 messageDAOV3;
private final CassandraIndexTableHandler indexTableHandler;
private final CassandraModSeqProvider modSeqProvider;
private final AttachmentLoader attachmentLoader;
private final CassandraConfiguration cassandraConfiguration;
public CassandraMessageIdMapper(MailboxMapper mailboxMapper, CassandraMailboxDAO mailboxDAO, CassandraAttachmentMapper attachmentMapper,
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO,
CassandraMessageDAO messageDAO, CassandraMessageDAOV3 messageDAOV3, CassandraIndexTableHandler indexTableHandler,
CassandraModSeqProvider modSeqProvider, CassandraConfiguration cassandraConfiguration) {
this.mailboxMapper = mailboxMapper;
this.mailboxDAO = mailboxDAO;
this.imapUidDAO = imapUidDAO;
this.messageIdDAO = messageIdDAO;
this.messageDAO = messageDAO;
this.messageDAOV3 = messageDAOV3;
this.indexTableHandler = indexTableHandler;
this.modSeqProvider = modSeqProvider;
this.attachmentLoader = new AttachmentLoader(attachmentMapper);
this.cassandraConfiguration = cassandraConfiguration;
}
@Override
public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType) {
return findReactive(messageIds, fetchType)
.collectList()
.block();
}
@Override
public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, FetchType fetchType) {
return Flux.fromIterable(messageIds)
.flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty(), chooseReadConsistency()),
cassandraConfiguration.getMessageReadChunkSize())
.flatMap(composedMessageId -> messageDAOV3.retrieveMessage(composedMessageId, fetchType)
.switchIfEmpty(messageDAO.retrieveMessage(composedMessageId, fetchType))
.map(messageRepresentation -> Pair.of(composedMessageId, messageRepresentation)), cassandraConfiguration.getMessageReadChunkSize())
.flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType), cassandraConfiguration.getMessageReadChunkSize())
.groupBy(MailboxMessage::getMailboxId)
.flatMap(this::keepMessageIfMailboxExists, ReactorUtils.DEFAULT_CONCURRENCY);
}
@Override
public Publisher<ComposedMessageIdWithMetaData> findMetadata(MessageId messageId) {
return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty(), chooseReadConsistency());
}
private Flux<MailboxMessage> keepMessageIfMailboxExists(GroupedFlux<MailboxId, MailboxMessage> groupedFlux) {
CassandraId cassandraId = (CassandraId) groupedFlux.key();
return mailboxDAO.retrieveMailbox(cassandraId)
.flatMapMany(any -> groupedFlux)
.switchIfEmpty(groupedFlux.map(message -> {
LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.",
cassandraId.serialize(),
message.getMessageId().serialize());
return message;
}).then(Mono.empty()));
}
@Override
public List<MailboxId> findMailboxes(MessageId messageId) {
return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty(), chooseReadConsistency())
.map(ComposedMessageIdWithMetaData::getComposedMessageId)
.map(ComposedMessageId::getMailboxId)
.collectList()
.block();
}
public ConsistencyChoice chooseReadConsistency() {
if (cassandraConfiguration.isMessageReadStrongConsistency()) {
return STRONG;
} else {
return WEAK;
}
}
private ConsistencyChoice chooseReadConsistencyUponWrites() {
if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
return STRONG;
}
return WEAK;
}
@Override
public void save(MailboxMessage mailboxMessage) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
MailboxReactorUtils.block(mailboxMapper.findMailboxById(mailboxId)
.switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(mailboxId)))
.then(messageDAOV3.save(mailboxMessage))
.thenEmpty(saveMessageMetadata(mailboxMessage, mailboxId)));
}
@Override
public void copyInMailbox(MailboxMessage mailboxMessage, Mailbox mailbox) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
MailboxReactorUtils.block(saveMessageMetadata(mailboxMessage, mailboxId));
}
private Mono<Void> saveMessageMetadata(MailboxMessage mailboxMessage, CassandraId mailboxId) {
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = createMetadataFor(mailboxMessage);
return imapUidDAO.insert(composedMessageIdWithMetaData)
.thenEmpty(Flux.merge(
messageIdDAO.insert(composedMessageIdWithMetaData)
.retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)),
indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId))
.then());
}
private ComposedMessageIdWithMetaData createMetadataFor(MailboxMessage mailboxMessage) {
ComposedMessageId composedMessageId = new ComposedMessageId(
mailboxMessage.getMailboxId(),
mailboxMessage.getMessageId(),
mailboxMessage.getUid());
return ComposedMessageIdWithMetaData.builder()
.composedMessageId(composedMessageId)
.flags(mailboxMessage.createFlags())
.modSeq(mailboxMessage.getModSeq())
.build();
}
@Override
public void delete(MessageId messageId, Collection<MailboxId> mailboxIds) {
deleteReactive(messageId, mailboxIds).block();
}
@Override
public Mono<Void> deleteReactive(MessageId messageId, Collection<MailboxId> mailboxIds) {
CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId;
return Flux.fromStream(mailboxIds.stream())
.flatMap(mailboxId -> retrieveAndDeleteIndices(cassandraMessageId, Optional.of((CassandraId) mailboxId)),
DEFAULT_CONCURRENCY)
.then();
}
@Override
public void delete(Multimap<MessageId, MailboxId> ids) {
deleteReactive(ids)
.block();
}
public Mono<Void> deleteReactive(Multimap<MessageId, MailboxId> ids) {
return Flux.fromIterable(ids.asMap()
.entrySet())
.publishOn(Schedulers.elastic())
.flatMap(entry -> deleteReactive(entry.getKey(), entry.getValue()), cassandraConfiguration.getExpungeChunkSize(),
DEFAULT_CONCURRENCY)
.then();
}
private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
return imapUidDAO.retrieve(messageId, mailboxId, chooseReadConsistencyUponWrites())
.flatMap(this::deleteIds, ReactorUtils.DEFAULT_CONCURRENCY)
.then();
}
@Override
public void delete(MessageId messageId) {
CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId;
retrieveAndDeleteIndices(cassandraMessageId, Optional.empty())
.block();
}
private Mono<Void> deleteIds(ComposedMessageIdWithMetaData metaData) {
CassandraMessageId messageId = (CassandraMessageId) metaData.getComposedMessageId().getMessageId();
CassandraId mailboxId = (CassandraId) metaData.getComposedMessageId().getMailboxId();
return Flux.merge(
imapUidDAO.delete(messageId, mailboxId),
messageIdDAO.delete(mailboxId, metaData.getComposedMessageId().getUid()))
.then(indexTableHandler.updateIndexOnDelete(metaData, mailboxId));
}
@Override
public Mono<Multimap<MailboxId, UpdatedFlags>> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
return Flux.fromIterable(mailboxIds)
.distinct()
.map(mailboxId -> (CassandraId) mailboxId)
.concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
.flatMap(this::updateCounts, ReactorUtils.DEFAULT_CONCURRENCY)
.collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight));
}
private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
return Mono.defer(() -> updateFlags(mailboxId, messageId, newState, updateMode))
.retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
.onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
return Mono.empty();
})
.flux()
.flatMapIterable(Function.identity())
.map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
}
private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) {
return Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(),
UpdatedFlags.builder()
.uid(composedMessageIdWithMetaData.getComposedMessageId().getUid())
.messageId(composedMessageIdWithMetaData.getComposedMessageId().getMessageId())
.modSeq(composedMessageIdWithMetaData.getModSeq())
.oldFlags(oldFlags)
.newFlags(composedMessageIdWithMetaData.getFlags())
.build());
}
private Mono<Pair<MailboxId, UpdatedFlags>> updateCounts(Pair<MailboxId, UpdatedFlags> pair) {
CassandraId cassandraId = (CassandraId) pair.getLeft();
return indexTableHandler.updateIndexOnFlagsUpdate(cassandraId, pair.getRight())
.thenReturn(pair);
}
private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) {
CassandraId cassandraId = (CassandraId) mailboxId;
return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), chooseReadConsistencyUponWrites())
.flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY)
.switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
.collectList();
}
private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) {
Flags newFlags = new FlagsUpdateCalculator(newState, updateMode).buildNewFlags(oldComposedId.getFlags());
if (identicalFlags(oldComposedId, newFlags)) {
return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId));
} else {
return modSeqProvider.nextModSeq(cassandraId)
.map(modSeq -> new ComposedMessageIdWithMetaData(
oldComposedId.getComposedMessageId(),
newFlags,
modSeq))
.flatMap(newComposedId -> updateFlags(oldComposedId, newComposedId));
}
}
private boolean identicalFlags(ComposedMessageIdWithMetaData oldComposedId, Flags newFlags) {
return oldComposedId.getFlags().equals(newFlags);
}
private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) {
return imapUidDAO.updateMetadata(newComposedId, oldComposedId.getModSeq())
.filter(FunctionalUtils.identityPredicate())
.flatMap(any -> messageIdDAO.updateMetadata(newComposedId)
.thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId)))
.single();
}
}