blob: 25af4c3559434597eb23defc9107768bf39c4d76 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailbox.cassandra;
import static org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration.ConsistencyChoice;
import static org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration.ConsistencyChoice.STRONG;
import static org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration.ConsistencyChoice.WEAK;
import static org.apache.james.util.FunctionalUtils.negate;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
import java.util.Optional;
import java.util.function.Predicate;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.events.Event;
import org.apache.james.events.EventListener;
import org.apache.james.events.Group;
import org.apache.james.mailbox.acl.ACLDiff;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.CassandraACLMapper;
import org.apache.james.mailbox.cassandra.mail.CassandraApplicableFlagDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentOwnerDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraDeletedMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraFirstUnseenDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
import org.apache.james.mailbox.events.MailboxEvents.Expunged;
import org.apache.james.mailbox.events.MailboxEvents.MailboxDeletion;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.util.streams.Limit;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* This listener cleans Cassandra metadata up. It retrieves dandling unreferenced metadata after the delete operation
* had been conducted out. Then it deletes the lower levels first so that upon failures undeleted metadata can still be
* reached.
*
* This cleanup is not needed for strict correctness from a MailboxManager point of view thus it could be carried out
* asynchronously, via mailbox listeners so that it can be retried.
*
* Mailbox listener failures lead to eventBus retrying their execution, it ensures the result of the deletion to be
* idempotent.
*/
public class DeleteMessageListener implements EventListener.ReactiveGroupEventListener {
private static final Optional<CassandraId> ALL_MAILBOXES = Optional.empty();
public static class DeleteMessageListenerGroup extends Group {
}
private final CassandraMessageIdToImapUidDAO imapUidDAO;
private final CassandraMessageIdDAO messageIdDAO;
private final CassandraMessageDAO messageDAO;
private final CassandraMessageDAOV3 messageDAOV3;
private final CassandraAttachmentDAOV2 attachmentDAO;
private final CassandraAttachmentOwnerDAO ownerDAO;
private final CassandraAttachmentMessageIdDAO attachmentMessageIdDAO;
private final CassandraACLMapper aclMapper;
private final CassandraUserMailboxRightsDAO rightsDAO;
private final CassandraApplicableFlagDAO applicableFlagDAO;
private final CassandraFirstUnseenDAO firstUnseenDAO;
private final CassandraDeletedMessageDAO deletedMessageDAO;
private final CassandraMailboxCounterDAO counterDAO;
private final CassandraMailboxRecentsDAO recentsDAO;
private final BlobStore blobStore;
private final CassandraConfiguration cassandraConfiguration;
@Inject
public DeleteMessageListener(CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
CassandraMessageDAOV3 messageDAOV3, CassandraAttachmentDAOV2 attachmentDAO, CassandraAttachmentOwnerDAO ownerDAO,
CassandraAttachmentMessageIdDAO attachmentMessageIdDAO, CassandraACLMapper aclMapper,
CassandraUserMailboxRightsDAO rightsDAO, CassandraApplicableFlagDAO applicableFlagDAO,
CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO,
CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore,
CassandraConfiguration cassandraConfiguration) {
this.imapUidDAO = imapUidDAO;
this.messageIdDAO = messageIdDAO;
this.messageDAO = messageDAO;
this.messageDAOV3 = messageDAOV3;
this.attachmentDAO = attachmentDAO;
this.ownerDAO = ownerDAO;
this.attachmentMessageIdDAO = attachmentMessageIdDAO;
this.aclMapper = aclMapper;
this.rightsDAO = rightsDAO;
this.applicableFlagDAO = applicableFlagDAO;
this.firstUnseenDAO = firstUnseenDAO;
this.deletedMessageDAO = deletedMessageDAO;
this.counterDAO = counterDAO;
this.recentsDAO = recentsDAO;
this.blobStore = blobStore;
this.cassandraConfiguration = cassandraConfiguration;
}
@Override
public Group getDefaultGroup() {
return new DeleteMessageListenerGroup();
}
@Override
public boolean isHandling(Event event) {
return event instanceof Expunged || event instanceof MailboxDeletion;
}
@Override
public Publisher<Void> reactiveEvent(Event event) {
if (event instanceof Expunged) {
Expunged expunged = (Expunged) event;
return handleMessageDeletion(expunged);
}
if (event instanceof MailboxDeletion) {
MailboxDeletion mailboxDeletion = (MailboxDeletion) event;
CassandraId mailboxId = (CassandraId) mailboxDeletion.getMailboxId();
return handleMailboxDeletion(mailboxId);
}
return Mono.empty();
}
private Mono<Void> handleMailboxDeletion(CassandraId mailboxId) {
int prefetch = 1;
return Flux.mergeDelayError(prefetch,
messageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited())
.map(ComposedMessageIdWithMetaData::getComposedMessageId)
.concatMap(metadata -> handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) metadata.getMessageId(), mailboxId)
.then(imapUidDAO.delete((CassandraMessageId) metadata.getMessageId(), mailboxId))
.then(messageIdDAO.delete(mailboxId, metadata.getUid()))),
deleteAcl(mailboxId),
applicableFlagDAO.delete(mailboxId),
firstUnseenDAO.removeAll(mailboxId),
deletedMessageDAO.removeAll(mailboxId),
counterDAO.delete(mailboxId),
recentsDAO.delete(mailboxId))
.then();
}
private Mono<Void> handleMessageDeletion(Expunged expunged) {
return Flux.fromIterable(expunged.getExpunged()
.values())
.map(MessageMetaData::getMessageId)
.map(CassandraMessageId.class::cast)
.concatMap(this::handleMessageDeletion)
.then();
}
private Mono<Void> deleteAcl(CassandraId mailboxId) {
return aclMapper.getACL(mailboxId)
.flatMap(acl -> rightsDAO.update(mailboxId, ACLDiff.computeDiff(acl, MailboxACL.EMPTY))
.then(aclMapper.delete(mailboxId)));
}
private Mono<Void> handleMessageDeletion(CassandraMessageId messageId) {
return Mono.just(messageId)
.filterWhen(this::isReferenced)
.flatMap(id -> readMessage(id)
.flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
.flatMap(this::deleteMessageBlobs)
.flatMap(this::deleteAttachmentMessageIds)
.then(messageDAO.delete(messageId))
.then(messageDAOV3.delete(messageId)));
}
private Mono<Void> handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, CassandraId excludedId) {
return Mono.just(messageId)
.filterWhen(id -> isReferenced(id, excludedId))
.flatMap(id -> readMessage(id)
.flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
.flatMap(this::deleteMessageBlobs)
.flatMap(this::deleteAttachmentMessageIds)
.then(messageDAO.delete(messageId))
.then(messageDAOV3.delete(messageId)));
}
private Mono<MessageRepresentation> deleteMessageBlobs(MessageRepresentation message) {
return Flux.merge(
blobStore.delete(blobStore.getDefaultBucketName(), message.getHeaderId()),
blobStore.delete(blobStore.getDefaultBucketName(), message.getBodyId()))
.then()
.thenReturn(message);
}
private Mono<MessageRepresentation> readMessage(CassandraMessageId id) {
return messageDAOV3.retrieveMessage(id, MessageMapper.FetchType.Metadata)
.switchIfEmpty(messageDAO.retrieveMessage(id, MessageMapper.FetchType.Metadata));
}
private Mono<Void> deleteUnreferencedAttachments(MessageRepresentation message) {
return Flux.fromIterable(message.getAttachments())
.filterWhen(attachment -> ownerDAO.retrieveOwners(attachment.getAttachmentId()).hasElements().map(negate()), DEFAULT_CONCURRENCY)
.filterWhen(attachment -> hasOtherMessagesReferences(message, attachment), DEFAULT_CONCURRENCY)
.concatMap(attachment -> attachmentDAO.getAttachment(attachment.getAttachmentId())
.map(CassandraAttachmentDAOV2.DAOAttachment::getBlobId)
.flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId)))
.then(attachmentDAO.delete(attachment.getAttachmentId())))
.then();
}
private Mono<Void> deleteAttachmentMessageIds(MessageRepresentation message) {
return Flux.fromIterable(message.getAttachments())
.concatMap(attachment -> attachmentMessageIdDAO.delete(attachment.getAttachmentId(), message.getMessageId()))
.then();
}
private Mono<Boolean> hasOtherMessagesReferences(MessageRepresentation message, MessageAttachmentRepresentation attachment) {
return attachmentMessageIdDAO.getOwnerMessageIds(attachment.getAttachmentId())
.filter(Predicate.not(Predicate.isEqual(message.getMessageId())))
.hasElements()
.map(negate());
}
private Mono<Boolean> isReferenced(CassandraMessageId id) {
return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites())
.hasElements()
.map(negate());
}
private Mono<Boolean> isReferenced(CassandraMessageId id, CassandraId excludedId) {
return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites())
.filter(metadata -> !metadata.getComposedMessageId().getMailboxId().equals(excludedId))
.hasElements()
.map(negate());
}
private ConsistencyChoice chooseReadConsistencyUponWrites() {
if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
return STRONG;
}
return WEAK;
}
}