blob: 40b6bf02f4a4e255dcce9116bfa295a0959d1638 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.vault.metadata;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.time.Clock;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.Set;
import jakarta.inject.Inject;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.core.MailAddress;
import org.apache.james.core.MaybeSender;
import org.apache.james.mailbox.cassandra.DeleteMessageListener;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mime4j.MimeIOException;
import org.apache.james.mime4j.codec.DecodeMonitor;
import org.apache.james.mime4j.dom.Message;
import org.apache.james.mime4j.dom.address.Mailbox;
import org.apache.james.mime4j.message.DefaultMessageBuilder;
import org.apache.james.mime4j.stream.MimeConfig;
import org.apache.james.server.core.Envelope;
import org.apache.james.vault.DeletedMessage;
import org.apache.james.vault.DeletedMessageVault;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
public class DeletedMessageVaultDeletionCallback implements DeleteMessageListener.DeletionCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(DeletedMessageVaultDeletionCallback.class);
private final DeletedMessageVault deletedMessageVault;
private final BlobStore blobStore;
private final Clock clock;
@Inject
public DeletedMessageVaultDeletionCallback(DeletedMessageVault deletedMessageVault, BlobStore blobStore, Clock clock) {
this.deletedMessageVault = deletedMessageVault;
this.blobStore = blobStore;
this.clock = clock;
}
@Override
public Mono<Void> forMessage(DeleteMessageListener.DeletedMessageCopyCommand copyCommand) {
return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), copyCommand.getHeaderId(), BlobStore.StoragePolicy.LOW_COST))
.flatMap(bytes -> {
Optional<Message> mimeMessage = parseMessage(new ByteArrayInputStream(bytes), copyCommand.getMessageId());
DeletedMessage deletedMessage = DeletedMessage.builder()
.messageId(copyCommand.getMessageId())
.originMailboxes(copyCommand.getMailboxId())
.user(copyCommand.getOwner())
.deliveryDate(ZonedDateTime.ofInstant(copyCommand.getInternalDate().toInstant(), ZoneOffset.UTC))
.deletionDate(ZonedDateTime.ofInstant(clock.instant(), ZoneOffset.UTC))
.sender(retrieveSender(mimeMessage))
.recipients(retrieveRecipients(mimeMessage))
.hasAttachment(copyCommand.hasAttachments())
.size(copyCommand.getSize())
.subject(mimeMessage.map(Message::getSubject))
.build();
return Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), copyCommand.getBodyId(), BlobStore.StoragePolicy.LOW_COST))
.map(bodyStream -> new SequenceInputStream(new ByteArrayInputStream(bytes), bodyStream))
.flatMap(bodyStream -> Mono.from(deletedMessageVault.append(deletedMessage, bodyStream)));
});
}
private Optional<Message> parseMessage(InputStream inputStream, MessageId messageId) {
DefaultMessageBuilder messageBuilder = new DefaultMessageBuilder();
messageBuilder.setMimeEntityConfig(MimeConfig.PERMISSIVE);
messageBuilder.setDecodeMonitor(DecodeMonitor.SILENT);
try {
return Optional.ofNullable(messageBuilder.parseMessage(inputStream));
} catch (MimeIOException e) {
LOGGER.warn("Can not parse the message {}", messageId, e);
return Optional.empty();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private MaybeSender retrieveSender(Optional<Message> mimeMessage) {
return mimeMessage
.map(Message::getSender)
.map(Mailbox::getAddress)
.map(MaybeSender::getMailSender)
.orElse(MaybeSender.nullSender());
}
private Set<MailAddress> retrieveRecipients(Optional<Message> maybeMessage) {
return maybeMessage.map(message -> Envelope.fromMime4JMessage(message, Envelope.ValidationPolicy.IGNORE))
.map(Envelope::getRecipients)
.orElse(ImmutableSet.of());
}
}