[ENHANCEMENT] Workqueue for the deleted message vault (#2131)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index 6e64c8f..e01cc96 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -23,6 +23,7 @@
import static org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles.ConsistencyChoice.WEAK;
import static org.apache.james.util.FunctionalUtils.negate;
+import java.util.Date;
import java.util.Optional;
import java.util.Set;
@@ -30,6 +31,7 @@
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
+import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.core.Username;
import org.apache.james.events.Event;
@@ -59,6 +61,7 @@
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.store.mail.MessageMapper;
@@ -86,9 +89,72 @@
}
+ public static class DeletedMessageCopyCommand {
+ public static DeletedMessageCopyCommand of(MessageRepresentation message, MailboxId mailboxId, Username owner) {
+ return new DeletedMessageCopyCommand(message.getMessageId(), mailboxId, owner, message.getInternalDate(),
+ message.getSize(), !message.getAttachments().isEmpty(), message.getHeaderId(), message.getBodyId());
+ }
+
+ private final MessageId messageId;
+ private final MailboxId mailboxId;
+ private final Username owner;
+ private final Date internalDate;
+ private final long size;
+ private final boolean hasAttachments;
+ private final BlobId headerId;
+ private final BlobId bodyId;
+
+ public DeletedMessageCopyCommand(MessageId messageId, MailboxId mailboxId, Username owner, Date internalDate, long size, boolean hasAttachments, BlobId headerId, BlobId bodyId) {
+ this.messageId = messageId;
+ this.mailboxId = mailboxId;
+ this.owner = owner;
+ this.internalDate = internalDate;
+ this.size = size;
+ this.hasAttachments = hasAttachments;
+ this.headerId = headerId;
+ this.bodyId = bodyId;
+ }
+
+ public Username getOwner() {
+ return owner;
+ }
+
+ public MessageId getMessageId() {
+ return messageId;
+ }
+
+ public MailboxId getMailboxId() {
+ return mailboxId;
+ }
+
+ public Date getInternalDate() {
+ return internalDate;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public boolean hasAttachments() {
+ return hasAttachments;
+ }
+
+ public BlobId getHeaderId() {
+ return headerId;
+ }
+
+ public BlobId getBodyId() {
+ return bodyId;
+ }
+ }
+
@FunctionalInterface
public interface DeletionCallback {
- Mono<Void> forMessage(MessageRepresentation message, MailboxId mailboxId, Username owner);
+ default Mono<Void> forMessage(MessageRepresentation message, MailboxId mailboxId, Username owner) {
+ return forMessage(DeletedMessageCopyCommand.of(message, mailboxId, owner));
+ }
+
+ Mono<Void> forMessage(DeletedMessageCopyCommand copyCommand);
}
private final CassandraThreadDAO threadDAO;
diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
index 9003b03..40b6bf0 100644
--- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
+++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
@@ -34,10 +34,7 @@
import org.apache.james.blob.api.BlobStore;
import org.apache.james.core.MailAddress;
import org.apache.james.core.MaybeSender;
-import org.apache.james.core.Username;
import org.apache.james.mailbox.cassandra.DeleteMessageListener;
-import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
-import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mime4j.MimeIOException;
import org.apache.james.mime4j.codec.DecodeMonitor;
@@ -71,29 +68,30 @@
}
@Override
- public Mono<Void> forMessage(MessageRepresentation message, MailboxId mailboxId, Username owner) {
- return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), message.getHeaderId(), BlobStore.StoragePolicy.LOW_COST))
+ public Mono<Void> forMessage(DeleteMessageListener.DeletedMessageCopyCommand copyCommand) {
+ return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), copyCommand.getHeaderId(), BlobStore.StoragePolicy.LOW_COST))
.flatMap(bytes -> {
- Optional<Message> mimeMessage = parseMessage(new ByteArrayInputStream(bytes), message.getMessageId());
+ Optional<Message> mimeMessage = parseMessage(new ByteArrayInputStream(bytes), copyCommand.getMessageId());
DeletedMessage deletedMessage = DeletedMessage.builder()
- .messageId(message.getMessageId())
- .originMailboxes(mailboxId)
- .user(owner)
- .deliveryDate(ZonedDateTime.ofInstant(message.getInternalDate().toInstant(), ZoneOffset.UTC))
+ .messageId(copyCommand.getMessageId())
+ .originMailboxes(copyCommand.getMailboxId())
+ .user(copyCommand.getOwner())
+ .deliveryDate(ZonedDateTime.ofInstant(copyCommand.getInternalDate().toInstant(), ZoneOffset.UTC))
.deletionDate(ZonedDateTime.ofInstant(clock.instant(), ZoneOffset.UTC))
.sender(retrieveSender(mimeMessage))
.recipients(retrieveRecipients(mimeMessage))
- .hasAttachment(!message.getAttachments().isEmpty())
- .size(message.getSize())
+ .hasAttachment(copyCommand.hasAttachments())
+ .size(copyCommand.getSize())
.subject(mimeMessage.map(Message::getSubject))
.build();
- return Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), message.getBodyId(), BlobStore.StoragePolicy.LOW_COST))
+ return Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), copyCommand.getBodyId(), BlobStore.StoragePolicy.LOW_COST))
.map(bodyStream -> new SequenceInputStream(new ByteArrayInputStream(bytes), bodyStream))
.flatMap(bodyStream -> Mono.from(deletedMessageVault.append(deletedMessage, bodyStream)));
});
}
+
private Optional<Message> parseMessage(InputStream inputStream, MessageId messageId) {
DefaultMessageBuilder messageBuilder = new DefaultMessageBuilder();
messageBuilder.setMimeEntityConfig(MimeConfig.PERMISSIVE);
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
index c6e46fb..2b7a34a 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
@@ -32,9 +32,11 @@
public class VaultConfiguration {
public static final VaultConfiguration DEFAULT =
- new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
+ new VaultConfiguration(false, false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
public static final VaultConfiguration ENABLED_DEFAULT =
- new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
+ new VaultConfiguration(true, false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
+ public static final VaultConfiguration ENABLED_WORKQUEUE =
+ new VaultConfiguration(true, true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
public static VaultConfiguration from(Configuration propertiesConfiguration) {
Duration retentionPeriod = Optional.ofNullable(propertiesConfiguration.getString("retentionPeriod"))
@@ -43,15 +45,18 @@
String restoreLocation = Optional.ofNullable(propertiesConfiguration.getString("restoreLocation"))
.orElse(DEFAULT.getRestoreLocation());
boolean enabled = propertiesConfiguration.getBoolean("enabled", false);
- return new VaultConfiguration(enabled, retentionPeriod, restoreLocation);
+ boolean workQueueEnabled = propertiesConfiguration.getBoolean("workQueueEnabled", false);
+ return new VaultConfiguration(enabled, workQueueEnabled, retentionPeriod, restoreLocation);
}
private final boolean enabled;
+ private final boolean workQueueEnabled;
private final Duration retentionPeriod;
private final String restoreLocation;
- VaultConfiguration(boolean enabled, Duration retentionPeriod, String restoreLocation) {
+ VaultConfiguration(boolean enabled, boolean workQueueEnabled, Duration retentionPeriod, String restoreLocation) {
this.enabled = enabled;
+ this.workQueueEnabled = workQueueEnabled;
Preconditions.checkNotNull(retentionPeriod);
Preconditions.checkNotNull(restoreLocation);
@@ -63,6 +68,10 @@
return enabled;
}
+ public boolean isWorkQueueEnabled() {
+ return workQueueEnabled;
+ }
+
public Duration getRetentionPeriod() {
return retentionPeriod;
}
@@ -78,13 +87,14 @@
return Objects.equals(this.retentionPeriod, that.retentionPeriod)
&& Objects.equals(this.restoreLocation, that.restoreLocation)
- && Objects.equals(this.enabled, that.enabled);
+ && Objects.equals(this.enabled, that.enabled)
+ && Objects.equals(this.workQueueEnabled, that.workQueueEnabled);
}
return false;
}
@Override
public final int hashCode() {
- return Objects.hash(retentionPeriod, restoreLocation, enabled);
+ return Objects.hash(retentionPeriod, restoreLocation, enabled, workQueueEnabled);
}
}
diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
index 656af11..6fe39dd 100644
--- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
+++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
@@ -40,13 +40,13 @@
@Test
void constructorShouldThrowWhenRetentionPeriodIsNull() {
- assertThatThrownBy(() -> new VaultConfiguration(true, null, DefaultMailboxes.RESTORED_MESSAGES))
+ assertThatThrownBy(() -> new VaultConfiguration(true, false, null, DefaultMailboxes.RESTORED_MESSAGES))
.isInstanceOf(NullPointerException.class);
}
@Test
void constructorShouldThrowWhenRestoreLocationIsNull() {
- assertThatThrownBy(() -> new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), null))
+ assertThatThrownBy(() -> new VaultConfiguration(true, false, ChronoUnit.YEARS.getDuration(), null))
.isInstanceOf(NullPointerException.class);
}
@@ -62,7 +62,7 @@
configuration.addProperty("restoreLocation", "INBOX");
assertThat(VaultConfiguration.from(configuration)).isEqualTo(
- new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX));
+ new VaultConfiguration(false, false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX));
}
@Test
@@ -71,7 +71,7 @@
configuration.addProperty("retentionPeriod", "15d");
assertThat(VaultConfiguration.from(configuration)).isEqualTo(
- new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
+ new VaultConfiguration(false, false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
}
@Test
@@ -80,7 +80,7 @@
configuration.addProperty("retentionPeriod", "15h");
assertThat(VaultConfiguration.from(configuration)).isEqualTo(
- new VaultConfiguration(false, Duration.ofHours(15), DefaultMailboxes.RESTORED_MESSAGES));
+ new VaultConfiguration(false, false, Duration.ofHours(15), DefaultMailboxes.RESTORED_MESSAGES));
}
@Test
@@ -89,7 +89,7 @@
configuration.addProperty("retentionPeriod", "15");
assertThat(VaultConfiguration.from(configuration)).isEqualTo(
- new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
+ new VaultConfiguration(false, false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
}
@Test
diff --git a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index 7607d56..02ccef2 100644
--- a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -57,6 +57,7 @@
import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
import org.apache.james.modules.mailbox.CassandraQuotaMailingModule;
import org.apache.james.modules.mailbox.CassandraSessionModule;
+import org.apache.james.modules.mailbox.DistributedDeletedMessageVaultModule;
import org.apache.james.modules.mailbox.TikaMailboxModule;
import org.apache.james.modules.mailrepository.CassandraMailRepositoryModule;
import org.apache.james.modules.metrics.CassandraMetricsModule;
@@ -226,6 +227,11 @@
}
private static Module chooseDeletedMessageVault(VaultConfiguration vaultConfiguration) {
+ if (vaultConfiguration.isEnabled() && vaultConfiguration.isWorkQueueEnabled()) {
+ return Modules.combine(
+ new DistributedDeletedMessageVaultModule(),
+ new DeletedMessageVaultRoutesModule());
+ }
if (vaultConfiguration.isEnabled()) {
return Modules.combine(
new CassandraDeletedMessageVaultModule(),
diff --git a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
index f7c129a..b53b60d 100644
--- a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
+++ b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
@@ -64,6 +64,7 @@
import org.apache.james.modules.mailbox.CassandraMailboxQuotaLegacyModule;
import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
import org.apache.james.modules.mailbox.CassandraSessionModule;
+import org.apache.james.modules.mailbox.DistributedDeletedMessageVaultModule;
import org.apache.james.modules.mailbox.TikaMailboxModule;
import org.apache.james.modules.mailrepository.CassandraMailRepositoryModule;
import org.apache.james.modules.metrics.CassandraMetricsModule;
@@ -216,10 +217,15 @@
}
private static Module chooseDeletedMessageVault(VaultConfiguration vaultConfiguration) {
+ if (vaultConfiguration.isEnabled() && vaultConfiguration.isWorkQueueEnabled()) {
+ return Modules.combine(
+ new DistributedDeletedMessageVaultModule(),
+ new DeletedMessageVaultRoutesModule());
+ }
if (vaultConfiguration.isEnabled()) {
return Modules.combine(
- new CassandraDeletedMessageVaultModule(),
- new DeletedMessageVaultRoutesModule());
+ new DistributedDeletedMessageVaultModule(),
+ new CassandraDeletedMessageVaultModule());
}
return binder -> {
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
new file mode 100644
index 0000000..14ecde5
--- /dev/null
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
@@ -0,0 +1,266 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.modules.mailbox;
+
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+
+import java.util.Date;
+import java.util.Optional;
+
+import jakarta.annotation.PreDestroy;
+import jakarta.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.core.Username;
+import org.apache.james.lifecycle.api.Startable;
+import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.vault.metadata.DeletedMessageVaultDeletionCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.AMQP;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.Sender;
+
+public class DistributedDeletedMessageVaultDeletionCallback implements DeleteMessageListener.DeletionCallback, Startable {
+ public static final Logger LOGGER = LoggerFactory.getLogger(DistributedDeletedMessageVaultDeletionCallback.class);
+
+ private static class CopyCommandDTO {
+ public static CopyCommandDTO of(DeleteMessageListener.DeletedMessageCopyCommand command) {
+ return new CopyCommandDTO(
+ command.getMessageId().serialize(),
+ command.getMailboxId().serialize(),
+ command.getOwner().asString(),
+ command.getInternalDate(),
+ command.getSize(),
+ command.hasAttachments(),
+ command.getHeaderId().asString(),
+ command.getBodyId().asString());
+ }
+
+ private final String messageId;
+ private final String mailboxId;
+ private final String owner;
+ private final Date internalDate;
+ private final long size;
+ private final boolean hasAttachments;
+ private final String headerId;
+ private final String bodyId;
+
+ @JsonCreator
+ public CopyCommandDTO(@JsonProperty("messageId") String messageId,
+ @JsonProperty("mailboxId") String mailboxId,
+ @JsonProperty("owner") String owner,
+ @JsonProperty("internalDate") Date internalDate,
+ @JsonProperty("size") long size,
+ @JsonProperty("hasAttachments") boolean hasAttachments,
+ @JsonProperty("headerId") String headerId,
+ @JsonProperty("bodyId") String bodyId) {
+ this.messageId = messageId;
+ this.mailboxId = mailboxId;
+ this.owner = owner;
+ this.internalDate = internalDate;
+ this.size = size;
+ this.hasAttachments = hasAttachments;
+ this.headerId = headerId;
+ this.bodyId = bodyId;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public String getMailboxId() {
+ return mailboxId;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public Date getInternalDate() {
+ return internalDate;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public boolean isHasAttachments() {
+ return hasAttachments;
+ }
+
+ public String getHeaderId() {
+ return headerId;
+ }
+
+ public String getBodyId() {
+ return bodyId;
+ }
+
+ @JsonIgnore
+ DeleteMessageListener.DeletedMessageCopyCommand asPojo(MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory, BlobId.Factory blobIdFactory) {
+ return new DeleteMessageListener.DeletedMessageCopyCommand(messageIdFactory.fromString(messageId),
+ mailboxIdFactory.fromString(messageId),
+ Username.of(owner),
+ internalDate,
+ size,
+ hasAttachments,
+ blobIdFactory.from(headerId),
+ blobIdFactory.from(bodyId));
+ }
+ }
+
+ private static final String EXCHANGE = "deleted-message-vault";
+ private static final String QUEUE = "deleted-message-vault-work-queue";
+ private static final String DEAD_LETTER = QUEUE + "-dead-letter";
+ private static final boolean REQUEUE = true;
+ private static final int QOS = 5;
+
+ private final ReactorRabbitMQChannelPool channelPool;
+ private final RabbitMQConfiguration rabbitMQConfiguration;
+ private final DeletedMessageVaultDeletionCallback callback;
+ private final Sender sender;
+ private final ObjectMapper objectMapper;
+ private final MailboxId.Factory mailboxIdFactory;
+ private final MessageId.Factory messageIdFactory;
+ private final BlobId.Factory blobIdFactory;
+ private Receiver receiver;
+ private Disposable disposable;
+
+ @Inject
+ public DistributedDeletedMessageVaultDeletionCallback(Sender sender,
+ ReactorRabbitMQChannelPool channelPool,
+ RabbitMQConfiguration rabbitMQConfiguration,
+ DeletedMessageVaultDeletionCallback callback,
+ MailboxId.Factory mailboxIdFactory,
+ MessageId.Factory messageIdFactory,
+ BlobId.Factory blobIdFactory) {
+ this.sender = sender;
+ this.rabbitMQConfiguration = rabbitMQConfiguration;
+ this.callback = callback;
+ this.mailboxIdFactory = mailboxIdFactory;
+ this.messageIdFactory = messageIdFactory;
+ this.blobIdFactory = blobIdFactory;
+ this.objectMapper = new ObjectMapper();
+ this.channelPool = channelPool;
+ }
+
+ public void init() {
+ Flux.concat(
+ sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE)
+ .durable(DURABLE)
+ .type(DIRECT_EXCHANGE)),
+ sender.declareQueue(QueueSpecification.queue(DEAD_LETTER)
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
+ .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
+ .deadLetter(DEAD_LETTER)
+ .build())),
+ sender.declareQueue(QueueSpecification.queue(QUEUE)
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
+ .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
+ .deadLetter(DEAD_LETTER)
+ .build())),
+ sender.bind(BindingSpecification.binding()
+ .exchange(EXCHANGE)
+ .queue(QUEUE)
+ .routingKey(EMPTY_ROUTING_KEY)))
+ .then()
+ .block();
+
+ receiver = channelPool.createReceiver();
+ disposable = receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS))
+ .flatMap(this::handleMessage)
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe();
+ }
+
+ @PreDestroy
+ public void stop() {
+ Optional.ofNullable(disposable).ifPresent(Disposable::dispose);
+ Optional.ofNullable(receiver).ifPresent(Receiver::close);
+ }
+
+ private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
+ try {
+ CopyCommandDTO copyCommandDTO = objectMapper.readValue(delivery.getBody(), CopyCommandDTO.class);
+
+ return callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, messageIdFactory, blobIdFactory))
+ .doOnError(e -> {
+ LOGGER.error("Failed executing deletion callback for {}", copyCommandDTO.messageId, e);
+ delivery.nack(REQUEUE);
+ })
+ .doOnSuccess(any -> delivery.ack())
+ .doOnCancel(() -> delivery.nack(REQUEUE));
+ } catch (Exception e) {
+ LOGGER.error("Deserialization error: reject poisonous message for distributed Deleted message vault callback", e);
+ // Deserialization error: reject poisonous messages
+ delivery.nack(!REQUEUE);
+ return Mono.empty();
+ }
+ }
+
+ @Override
+ public Mono<Void> forMessage(DeleteMessageListener.DeletedMessageCopyCommand command) {
+ CopyCommandDTO dto = CopyCommandDTO.of(command);
+ try {
+ byte[] bytes = objectMapper.writeValueAsBytes(dto);
+ return sender.send(Mono.just(new OutboundMessage(EXCHANGE, EMPTY_ROUTING_KEY, new AMQP.BasicProperties.Builder()
+ .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+ .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+ .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+ .build(), bytes)));
+ } catch (JsonProcessingException e) {
+ return Mono.error(e);
+ }
+ }
+
+
+}
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
new file mode 100644
index 0000000..a291622
--- /dev/null
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
@@ -0,0 +1,78 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.modules.mailbox;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.modules.vault.DeletedMessageVaultModule;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
+import org.apache.james.vault.DeletedMessageVault;
+import org.apache.james.vault.blob.BlobStoreDeletedMessageVault;
+import org.apache.james.vault.blob.BucketNameGenerator;
+import org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter;
+import org.apache.james.vault.metadata.CassandraDeletedMessageMetadataVault;
+import org.apache.james.vault.metadata.DeletedMessageMetadataModule;
+import org.apache.james.vault.metadata.DeletedMessageMetadataVault;
+import org.apache.james.vault.metadata.MetadataDAO;
+import org.apache.james.vault.metadata.StorageInformationDAO;
+import org.apache.james.vault.metadata.UserPerBucketDAO;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
+
+public class DistributedDeletedMessageVaultModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ install(new DeletedMessageVaultModule());
+
+ Multibinder<CassandraModule> cassandraDataDefinitions = Multibinder.newSetBinder(binder(), CassandraModule.class);
+ cassandraDataDefinitions
+ .addBinding()
+ .toInstance(DeletedMessageMetadataModule.MODULE);
+
+ bind(MetadataDAO.class).in(Scopes.SINGLETON);
+ bind(StorageInformationDAO.class).in(Scopes.SINGLETON);
+ bind(UserPerBucketDAO.class).in(Scopes.SINGLETON);
+ bind(DeletedMessageWithStorageInformationConverter.class).in(Scopes.SINGLETON);
+
+ bind(CassandraDeletedMessageMetadataVault.class).in(Scopes.SINGLETON);
+ bind(DeletedMessageMetadataVault.class)
+ .to(CassandraDeletedMessageMetadataVault.class);
+
+ bind(BucketNameGenerator.class).in(Scopes.SINGLETON);
+ bind(BlobStoreDeletedMessageVault.class).in(Scopes.SINGLETON);
+ bind(DeletedMessageVault.class)
+ .to(BlobStoreDeletedMessageVault.class);
+
+ Multibinder.newSetBinder(binder(), DeleteMessageListener.DeletionCallback.class)
+ .addBinding()
+ .to(DistributedDeletedMessageVaultDeletionCallback.class);
+ }
+
+ @ProvidesIntoSet
+ InitializationOperation init(DistributedDeletedMessageVaultDeletionCallback callback) {
+ return InitilizationOperationBuilder
+ .forClass(DistributedDeletedMessageVaultDeletionCallback.class)
+ .init(callback::init);
+ }
+}
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
new file mode 100644
index 0000000..38a5773
--- /dev/null
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
@@ -0,0 +1,81 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.integration.rabbitmq.vault;
+
+import org.apache.james.CassandraExtension;
+import org.apache.james.CassandraRabbitMQJamesConfiguration;
+import org.apache.james.CassandraRabbitMQJamesServerMain;
+import org.apache.james.DockerOpenSearchExtension;
+import org.apache.james.GuiceJamesServer;
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.SearchConfiguration;
+import org.apache.james.junit.categories.BasicFeature;
+import org.apache.james.modules.AwsS3BlobStoreExtension;
+import org.apache.james.modules.RabbitMQExtension;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.blobstore.BlobStoreConfiguration;
+import org.apache.james.vault.VaultConfiguration;
+import org.apache.james.webadmin.integration.vault.DeletedMessageVaultIntegrationTest;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class WorkQueueEnabledDeletedMessageVaultIntegrationTest extends DeletedMessageVaultIntegrationTest {
+
+ private static final DockerOpenSearchExtension ES_EXTENSION = new DockerOpenSearchExtension();
+
+ @RegisterExtension
+ static JamesServerExtension testExtension = new JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
+ CassandraRabbitMQJamesConfiguration.builder()
+ .workingDirectory(tmpDir)
+ .configurationFromClasspath()
+ .blobStore(BlobStoreConfiguration.builder()
+ .s3()
+ .disableCache()
+ .deduplication()
+ .noCryptoConfig())
+ .searchConfiguration(SearchConfiguration.openSearch())
+ .vaultConfiguration(VaultConfiguration.ENABLED_WORKQUEUE)
+ .build())
+ .extension(ES_EXTENSION)
+ .extension(new CassandraExtension())
+ .extension(new AwsS3BlobStoreExtension())
+ .extension(new RabbitMQExtension())
+ .extension(new ClockExtension())
+ .server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration)
+ .overrideWith(new TestJMAPServerModule()))
+ .build();
+
+ @Override
+ protected void awaitSearchUpToDate() {
+ ES_EXTENSION.await();
+ }
+
+ @Disabled("JAMES-2688 Unstable test")
+ @Test
+ @Tag(BasicFeature.TAG)
+ @Override
+ public void vaultExportShouldExportZipContainsVaultMessagesToShareeWhenImapDeletedMailbox(GuiceJamesServer jmapServer) {
+
+ }
+
+}