[ENHANCEMENT] Workqueue for the deleted message vault (#2131)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index 6e64c8f..e01cc96 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -23,6 +23,7 @@
 import static org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles.ConsistencyChoice.WEAK;
 import static org.apache.james.util.FunctionalUtils.negate;
 
+import java.util.Date;
 import java.util.Optional;
 import java.util.Set;
 
@@ -30,6 +31,7 @@
 
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
+import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.core.Username;
 import org.apache.james.events.Event;
@@ -59,6 +61,7 @@
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.store.mail.MessageMapper;
@@ -86,9 +89,72 @@
 
     }
 
+    public static class DeletedMessageCopyCommand {
+        public static DeletedMessageCopyCommand of(MessageRepresentation message, MailboxId mailboxId, Username owner) {
+            return new DeletedMessageCopyCommand(message.getMessageId(), mailboxId, owner, message.getInternalDate(),
+                message.getSize(), !message.getAttachments().isEmpty(), message.getHeaderId(), message.getBodyId());
+        }
+
+        private final MessageId messageId;
+        private final MailboxId mailboxId;
+        private final Username owner;
+        private final Date internalDate;
+        private final long size;
+        private final boolean hasAttachments;
+        private final BlobId headerId;
+        private final BlobId bodyId;
+
+        public DeletedMessageCopyCommand(MessageId messageId, MailboxId mailboxId, Username owner, Date internalDate, long size, boolean hasAttachments, BlobId headerId, BlobId bodyId) {
+            this.messageId = messageId;
+            this.mailboxId = mailboxId;
+            this.owner = owner;
+            this.internalDate = internalDate;
+            this.size = size;
+            this.hasAttachments = hasAttachments;
+            this.headerId = headerId;
+            this.bodyId = bodyId;
+        }
+
+        public Username getOwner() {
+            return owner;
+        }
+
+        public MessageId getMessageId() {
+            return messageId;
+        }
+
+        public MailboxId getMailboxId() {
+            return mailboxId;
+        }
+
+        public Date getInternalDate() {
+            return internalDate;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public boolean hasAttachments() {
+            return hasAttachments;
+        }
+
+        public BlobId getHeaderId() {
+            return headerId;
+        }
+
+        public BlobId getBodyId() {
+            return bodyId;
+        }
+    }
+
     @FunctionalInterface
     public interface DeletionCallback {
-        Mono<Void> forMessage(MessageRepresentation message, MailboxId mailboxId, Username owner);
+        default Mono<Void> forMessage(MessageRepresentation message, MailboxId mailboxId, Username owner) {
+            return forMessage(DeletedMessageCopyCommand.of(message, mailboxId, owner));
+        }
+        
+        Mono<Void> forMessage(DeletedMessageCopyCommand copyCommand);
     }
 
     private final CassandraThreadDAO threadDAO;
diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
index 9003b03..40b6bf0 100644
--- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
+++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
@@ -34,10 +34,7 @@
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.MaybeSender;
-import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.DeleteMessageListener;
-import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
-import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mime4j.MimeIOException;
 import org.apache.james.mime4j.codec.DecodeMonitor;
@@ -71,29 +68,30 @@
     }
 
     @Override
-    public Mono<Void> forMessage(MessageRepresentation message, MailboxId mailboxId, Username owner) {
-        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), message.getHeaderId(), BlobStore.StoragePolicy.LOW_COST))
+    public Mono<Void> forMessage(DeleteMessageListener.DeletedMessageCopyCommand copyCommand) {
+        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), copyCommand.getHeaderId(), BlobStore.StoragePolicy.LOW_COST))
             .flatMap(bytes -> {
-                Optional<Message> mimeMessage = parseMessage(new ByteArrayInputStream(bytes), message.getMessageId());
+                Optional<Message> mimeMessage = parseMessage(new ByteArrayInputStream(bytes), copyCommand.getMessageId());
                 DeletedMessage deletedMessage = DeletedMessage.builder()
-                    .messageId(message.getMessageId())
-                    .originMailboxes(mailboxId)
-                    .user(owner)
-                    .deliveryDate(ZonedDateTime.ofInstant(message.getInternalDate().toInstant(), ZoneOffset.UTC))
+                    .messageId(copyCommand.getMessageId())
+                    .originMailboxes(copyCommand.getMailboxId())
+                    .user(copyCommand.getOwner())
+                    .deliveryDate(ZonedDateTime.ofInstant(copyCommand.getInternalDate().toInstant(), ZoneOffset.UTC))
                     .deletionDate(ZonedDateTime.ofInstant(clock.instant(), ZoneOffset.UTC))
                     .sender(retrieveSender(mimeMessage))
                     .recipients(retrieveRecipients(mimeMessage))
-                    .hasAttachment(!message.getAttachments().isEmpty())
-                    .size(message.getSize())
+                    .hasAttachment(copyCommand.hasAttachments())
+                    .size(copyCommand.getSize())
                     .subject(mimeMessage.map(Message::getSubject))
                     .build();
 
-                return Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), message.getBodyId(), BlobStore.StoragePolicy.LOW_COST))
+                return Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), copyCommand.getBodyId(), BlobStore.StoragePolicy.LOW_COST))
                     .map(bodyStream -> new SequenceInputStream(new ByteArrayInputStream(bytes), bodyStream))
                     .flatMap(bodyStream -> Mono.from(deletedMessageVault.append(deletedMessage, bodyStream)));
             });
     }
 
+
     private Optional<Message> parseMessage(InputStream inputStream, MessageId messageId) {
         DefaultMessageBuilder messageBuilder = new DefaultMessageBuilder();
         messageBuilder.setMimeEntityConfig(MimeConfig.PERMISSIVE);
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
index c6e46fb..2b7a34a 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
@@ -32,9 +32,11 @@
 
 public class VaultConfiguration {
     public static final VaultConfiguration DEFAULT =
-        new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
+        new VaultConfiguration(false, false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
     public static final VaultConfiguration ENABLED_DEFAULT =
-        new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
+        new VaultConfiguration(true, false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
+    public static final VaultConfiguration ENABLED_WORKQUEUE =
+        new VaultConfiguration(true, true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
 
     public static VaultConfiguration from(Configuration propertiesConfiguration) {
         Duration retentionPeriod = Optional.ofNullable(propertiesConfiguration.getString("retentionPeriod"))
@@ -43,15 +45,18 @@
         String restoreLocation = Optional.ofNullable(propertiesConfiguration.getString("restoreLocation"))
             .orElse(DEFAULT.getRestoreLocation());
         boolean enabled = propertiesConfiguration.getBoolean("enabled", false);
-        return new VaultConfiguration(enabled, retentionPeriod, restoreLocation);
+        boolean workQueueEnabled = propertiesConfiguration.getBoolean("workQueueEnabled", false);
+        return new VaultConfiguration(enabled, workQueueEnabled, retentionPeriod, restoreLocation);
     }
 
     private final boolean enabled;
+    private final boolean workQueueEnabled;
     private final Duration retentionPeriod;
     private final String restoreLocation;
 
-    VaultConfiguration(boolean enabled, Duration retentionPeriod, String restoreLocation) {
+    VaultConfiguration(boolean enabled, boolean workQueueEnabled, Duration retentionPeriod, String restoreLocation) {
         this.enabled = enabled;
+        this.workQueueEnabled = workQueueEnabled;
         Preconditions.checkNotNull(retentionPeriod);
         Preconditions.checkNotNull(restoreLocation);
 
@@ -63,6 +68,10 @@
         return enabled;
     }
 
+    public boolean isWorkQueueEnabled() {
+        return workQueueEnabled;
+    }
+
     public Duration getRetentionPeriod() {
         return retentionPeriod;
     }
@@ -78,13 +87,14 @@
 
             return Objects.equals(this.retentionPeriod, that.retentionPeriod)
                 && Objects.equals(this.restoreLocation, that.restoreLocation)
-                && Objects.equals(this.enabled, that.enabled);
+                && Objects.equals(this.enabled, that.enabled)
+                && Objects.equals(this.workQueueEnabled, that.workQueueEnabled);
         }
         return false;
     }
 
     @Override
     public final int hashCode() {
-        return Objects.hash(retentionPeriod, restoreLocation, enabled);
+        return Objects.hash(retentionPeriod, restoreLocation, enabled, workQueueEnabled);
     }
 }
diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
index 656af11..6fe39dd 100644
--- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
+++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
@@ -40,13 +40,13 @@
 
     @Test
     void constructorShouldThrowWhenRetentionPeriodIsNull() {
-        assertThatThrownBy(() -> new VaultConfiguration(true, null, DefaultMailboxes.RESTORED_MESSAGES))
+        assertThatThrownBy(() -> new VaultConfiguration(true, false, null, DefaultMailboxes.RESTORED_MESSAGES))
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
     void constructorShouldThrowWhenRestoreLocationIsNull() {
-        assertThatThrownBy(() -> new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), null))
+        assertThatThrownBy(() -> new VaultConfiguration(true, false, ChronoUnit.YEARS.getDuration(), null))
             .isInstanceOf(NullPointerException.class);
     }
 
@@ -62,7 +62,7 @@
         configuration.addProperty("restoreLocation", "INBOX");
 
         assertThat(VaultConfiguration.from(configuration)).isEqualTo(
-            new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX));
+            new VaultConfiguration(false, false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX));
     }
 
     @Test
@@ -71,7 +71,7 @@
         configuration.addProperty("retentionPeriod", "15d");
 
         assertThat(VaultConfiguration.from(configuration)).isEqualTo(
-            new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
+            new VaultConfiguration(false, false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
     }
 
     @Test
@@ -80,7 +80,7 @@
         configuration.addProperty("retentionPeriod", "15h");
 
         assertThat(VaultConfiguration.from(configuration)).isEqualTo(
-            new VaultConfiguration(false, Duration.ofHours(15), DefaultMailboxes.RESTORED_MESSAGES));
+            new VaultConfiguration(false, false, Duration.ofHours(15), DefaultMailboxes.RESTORED_MESSAGES));
     }
 
     @Test
@@ -89,7 +89,7 @@
         configuration.addProperty("retentionPeriod", "15");
 
         assertThat(VaultConfiguration.from(configuration)).isEqualTo(
-            new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
+            new VaultConfiguration(false, false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
     }
 
     @Test
diff --git a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index 7607d56..02ccef2 100644
--- a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -57,6 +57,7 @@
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
 import org.apache.james.modules.mailbox.CassandraQuotaMailingModule;
 import org.apache.james.modules.mailbox.CassandraSessionModule;
+import org.apache.james.modules.mailbox.DistributedDeletedMessageVaultModule;
 import org.apache.james.modules.mailbox.TikaMailboxModule;
 import org.apache.james.modules.mailrepository.CassandraMailRepositoryModule;
 import org.apache.james.modules.metrics.CassandraMetricsModule;
@@ -226,6 +227,11 @@
     }
 
     private static Module chooseDeletedMessageVault(VaultConfiguration vaultConfiguration) {
+        if (vaultConfiguration.isEnabled() && vaultConfiguration.isWorkQueueEnabled()) {
+            return Modules.combine(
+                new DistributedDeletedMessageVaultModule(),
+                new DeletedMessageVaultRoutesModule());
+        }
         if (vaultConfiguration.isEnabled()) {
             return Modules.combine(
                 new CassandraDeletedMessageVaultModule(),
diff --git a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
index f7c129a..b53b60d 100644
--- a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
+++ b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
@@ -64,6 +64,7 @@
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaLegacyModule;
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
 import org.apache.james.modules.mailbox.CassandraSessionModule;
+import org.apache.james.modules.mailbox.DistributedDeletedMessageVaultModule;
 import org.apache.james.modules.mailbox.TikaMailboxModule;
 import org.apache.james.modules.mailrepository.CassandraMailRepositoryModule;
 import org.apache.james.modules.metrics.CassandraMetricsModule;
@@ -216,10 +217,15 @@
     }
 
     private static Module chooseDeletedMessageVault(VaultConfiguration vaultConfiguration) {
+        if (vaultConfiguration.isEnabled() && vaultConfiguration.isWorkQueueEnabled()) {
+            return Modules.combine(
+                new DistributedDeletedMessageVaultModule(),
+                new DeletedMessageVaultRoutesModule());
+        }
         if (vaultConfiguration.isEnabled()) {
             return Modules.combine(
-                new CassandraDeletedMessageVaultModule(),
-                new DeletedMessageVaultRoutesModule());
+                new DistributedDeletedMessageVaultModule(),
+                new CassandraDeletedMessageVaultModule());
         }
         return binder -> {
 
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
new file mode 100644
index 0000000..14ecde5
--- /dev/null
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
@@ -0,0 +1,266 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.mailbox;
+
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+
+import java.util.Date;
+import java.util.Optional;
+
+import jakarta.annotation.PreDestroy;
+import jakarta.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.core.Username;
+import org.apache.james.lifecycle.api.Startable;
+import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.vault.metadata.DeletedMessageVaultDeletionCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.AMQP;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.Sender;
+
+public class DistributedDeletedMessageVaultDeletionCallback implements DeleteMessageListener.DeletionCallback, Startable {
+    public static final Logger LOGGER = LoggerFactory.getLogger(DistributedDeletedMessageVaultDeletionCallback.class);
+
+    private static class CopyCommandDTO {
+        public static CopyCommandDTO of(DeleteMessageListener.DeletedMessageCopyCommand command) {
+            return new CopyCommandDTO(
+                command.getMessageId().serialize(),
+                command.getMailboxId().serialize(),
+                command.getOwner().asString(),
+                command.getInternalDate(),
+                command.getSize(),
+                command.hasAttachments(),
+                command.getHeaderId().asString(),
+                command.getBodyId().asString());
+        }
+
+        private final String messageId;
+        private final String mailboxId;
+        private final String owner;
+        private final Date internalDate;
+        private final long size;
+        private final boolean hasAttachments;
+        private final String headerId;
+        private final String bodyId;
+
+        @JsonCreator
+        public CopyCommandDTO(@JsonProperty("messageId") String messageId,
+                              @JsonProperty("mailboxId") String mailboxId,
+                              @JsonProperty("owner") String owner,
+                              @JsonProperty("internalDate") Date internalDate,
+                              @JsonProperty("size") long size,
+                              @JsonProperty("hasAttachments") boolean hasAttachments,
+                              @JsonProperty("headerId") String headerId,
+                              @JsonProperty("bodyId") String bodyId) {
+            this.messageId = messageId;
+            this.mailboxId = mailboxId;
+            this.owner = owner;
+            this.internalDate = internalDate;
+            this.size = size;
+            this.hasAttachments = hasAttachments;
+            this.headerId = headerId;
+            this.bodyId = bodyId;
+        }
+
+        public String getMessageId() {
+            return messageId;
+        }
+
+        public String getMailboxId() {
+            return mailboxId;
+        }
+
+        public String getOwner() {
+            return owner;
+        }
+
+        public Date getInternalDate() {
+            return internalDate;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public boolean isHasAttachments() {
+            return hasAttachments;
+        }
+
+        public String getHeaderId() {
+            return headerId;
+        }
+
+        public String getBodyId() {
+            return bodyId;
+        }
+
+        @JsonIgnore
+        DeleteMessageListener.DeletedMessageCopyCommand asPojo(MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory, BlobId.Factory blobIdFactory) {
+            return new DeleteMessageListener.DeletedMessageCopyCommand(messageIdFactory.fromString(messageId),
+                mailboxIdFactory.fromString(messageId),
+                Username.of(owner),
+                internalDate,
+                size,
+                hasAttachments,
+                blobIdFactory.from(headerId),
+                blobIdFactory.from(bodyId));
+        }
+    }
+
+    private static final String EXCHANGE = "deleted-message-vault";
+    private static final String QUEUE = "deleted-message-vault-work-queue";
+    private static final String DEAD_LETTER = QUEUE + "-dead-letter";
+    private static final boolean REQUEUE = true;
+    private static final int QOS = 5;
+
+    private final ReactorRabbitMQChannelPool channelPool;
+    private final RabbitMQConfiguration rabbitMQConfiguration;
+    private final DeletedMessageVaultDeletionCallback callback;
+    private final Sender sender;
+    private final ObjectMapper objectMapper;
+    private final MailboxId.Factory mailboxIdFactory;
+    private final MessageId.Factory messageIdFactory;
+    private final BlobId.Factory blobIdFactory;
+    private Receiver receiver;
+    private Disposable disposable;
+
+    @Inject
+    public DistributedDeletedMessageVaultDeletionCallback(Sender sender,
+                                                          ReactorRabbitMQChannelPool channelPool,
+                                                          RabbitMQConfiguration rabbitMQConfiguration,
+                                                          DeletedMessageVaultDeletionCallback callback,
+                                                          MailboxId.Factory mailboxIdFactory,
+                                                          MessageId.Factory messageIdFactory,
+                                                          BlobId.Factory blobIdFactory) {
+        this.sender = sender;
+        this.rabbitMQConfiguration = rabbitMQConfiguration;
+        this.callback = callback;
+        this.mailboxIdFactory = mailboxIdFactory;
+        this.messageIdFactory = messageIdFactory;
+        this.blobIdFactory = blobIdFactory;
+        this.objectMapper = new ObjectMapper();
+        this.channelPool = channelPool;
+    }
+
+    public void init() {
+        Flux.concat(
+                sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE)
+                    .durable(DURABLE)
+                    .type(DIRECT_EXCHANGE)),
+                sender.declareQueue(QueueSpecification.queue(DEAD_LETTER)
+                    .durable(DURABLE)
+                    .exclusive(!EXCLUSIVE)
+                    .autoDelete(!AUTO_DELETE)
+                    .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
+                        .deadLetter(DEAD_LETTER)
+                        .build())),
+                sender.declareQueue(QueueSpecification.queue(QUEUE)
+                    .durable(DURABLE)
+                    .exclusive(!EXCLUSIVE)
+                    .autoDelete(!AUTO_DELETE)
+                    .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
+                        .deadLetter(DEAD_LETTER)
+                        .build())),
+                sender.bind(BindingSpecification.binding()
+                    .exchange(EXCHANGE)
+                    .queue(QUEUE)
+                    .routingKey(EMPTY_ROUTING_KEY)))
+            .then()
+            .block();
+
+        receiver = channelPool.createReceiver();
+        disposable = receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS))
+            .flatMap(this::handleMessage)
+            .subscribeOn(Schedulers.boundedElastic())
+            .subscribe();
+    }
+
+    @PreDestroy
+    public void stop() {
+        Optional.ofNullable(disposable).ifPresent(Disposable::dispose);
+        Optional.ofNullable(receiver).ifPresent(Receiver::close);
+    }
+
+    private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
+        try {
+            CopyCommandDTO copyCommandDTO = objectMapper.readValue(delivery.getBody(), CopyCommandDTO.class);
+
+            return callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, messageIdFactory, blobIdFactory))
+                .doOnError(e -> {
+                    LOGGER.error("Failed executing deletion callback for {}", copyCommandDTO.messageId, e);
+                    delivery.nack(REQUEUE);
+                })
+                .doOnSuccess(any -> delivery.ack())
+                .doOnCancel(() -> delivery.nack(REQUEUE));
+        } catch (Exception e) {
+            LOGGER.error("Deserialization error: reject poisonous message for distributed Deleted message vault callback", e);
+            // Deserialization error: reject poisonous messages
+            delivery.nack(!REQUEUE);
+            return Mono.empty();
+        }
+    }
+
+    @Override
+    public Mono<Void> forMessage(DeleteMessageListener.DeletedMessageCopyCommand command) {
+        CopyCommandDTO dto = CopyCommandDTO.of(command);
+        try {
+            byte[] bytes = objectMapper.writeValueAsBytes(dto);
+            return sender.send(Mono.just(new OutboundMessage(EXCHANGE, EMPTY_ROUTING_KEY, new AMQP.BasicProperties.Builder()
+                .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+                .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+                .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+                .build(), bytes)));
+        } catch (JsonProcessingException e) {
+            return Mono.error(e);
+        }
+    }
+
+
+}
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
new file mode 100644
index 0000000..a291622
--- /dev/null
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
@@ -0,0 +1,78 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.mailbox;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.modules.vault.DeletedMessageVaultModule;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
+import org.apache.james.vault.DeletedMessageVault;
+import org.apache.james.vault.blob.BlobStoreDeletedMessageVault;
+import org.apache.james.vault.blob.BucketNameGenerator;
+import org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter;
+import org.apache.james.vault.metadata.CassandraDeletedMessageMetadataVault;
+import org.apache.james.vault.metadata.DeletedMessageMetadataModule;
+import org.apache.james.vault.metadata.DeletedMessageMetadataVault;
+import org.apache.james.vault.metadata.MetadataDAO;
+import org.apache.james.vault.metadata.StorageInformationDAO;
+import org.apache.james.vault.metadata.UserPerBucketDAO;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
+
+public class DistributedDeletedMessageVaultModule extends AbstractModule {
+    @Override
+    protected void configure() {
+        install(new DeletedMessageVaultModule());
+
+        Multibinder<CassandraModule> cassandraDataDefinitions = Multibinder.newSetBinder(binder(), CassandraModule.class);
+        cassandraDataDefinitions
+            .addBinding()
+            .toInstance(DeletedMessageMetadataModule.MODULE);
+
+        bind(MetadataDAO.class).in(Scopes.SINGLETON);
+        bind(StorageInformationDAO.class).in(Scopes.SINGLETON);
+        bind(UserPerBucketDAO.class).in(Scopes.SINGLETON);
+        bind(DeletedMessageWithStorageInformationConverter.class).in(Scopes.SINGLETON);
+
+        bind(CassandraDeletedMessageMetadataVault.class).in(Scopes.SINGLETON);
+        bind(DeletedMessageMetadataVault.class)
+            .to(CassandraDeletedMessageMetadataVault.class);
+
+        bind(BucketNameGenerator.class).in(Scopes.SINGLETON);
+        bind(BlobStoreDeletedMessageVault.class).in(Scopes.SINGLETON);
+        bind(DeletedMessageVault.class)
+            .to(BlobStoreDeletedMessageVault.class);
+
+        Multibinder.newSetBinder(binder(), DeleteMessageListener.DeletionCallback.class)
+            .addBinding()
+            .to(DistributedDeletedMessageVaultDeletionCallback.class);
+    }
+
+    @ProvidesIntoSet
+    InitializationOperation init(DistributedDeletedMessageVaultDeletionCallback callback) {
+        return InitilizationOperationBuilder
+            .forClass(DistributedDeletedMessageVaultDeletionCallback.class)
+            .init(callback::init);
+    }
+}
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
new file mode 100644
index 0000000..38a5773
--- /dev/null
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
@@ -0,0 +1,81 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.integration.rabbitmq.vault;
+
+import org.apache.james.CassandraExtension;
+import org.apache.james.CassandraRabbitMQJamesConfiguration;
+import org.apache.james.CassandraRabbitMQJamesServerMain;
+import org.apache.james.DockerOpenSearchExtension;
+import org.apache.james.GuiceJamesServer;
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.SearchConfiguration;
+import org.apache.james.junit.categories.BasicFeature;
+import org.apache.james.modules.AwsS3BlobStoreExtension;
+import org.apache.james.modules.RabbitMQExtension;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.blobstore.BlobStoreConfiguration;
+import org.apache.james.vault.VaultConfiguration;
+import org.apache.james.webadmin.integration.vault.DeletedMessageVaultIntegrationTest;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class WorkQueueEnabledDeletedMessageVaultIntegrationTest extends DeletedMessageVaultIntegrationTest {
+
+    private static final DockerOpenSearchExtension ES_EXTENSION = new DockerOpenSearchExtension();
+
+    @RegisterExtension
+    static JamesServerExtension testExtension = new JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
+        CassandraRabbitMQJamesConfiguration.builder()
+            .workingDirectory(tmpDir)
+            .configurationFromClasspath()
+            .blobStore(BlobStoreConfiguration.builder()
+                    .s3()
+                    .disableCache()
+                    .deduplication()
+                    .noCryptoConfig())
+            .searchConfiguration(SearchConfiguration.openSearch())
+            .vaultConfiguration(VaultConfiguration.ENABLED_WORKQUEUE)
+            .build())
+        .extension(ES_EXTENSION)
+        .extension(new CassandraExtension())
+        .extension(new AwsS3BlobStoreExtension())
+        .extension(new RabbitMQExtension())
+        .extension(new ClockExtension())
+        .server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration)
+            .overrideWith(new TestJMAPServerModule()))
+        .build();
+
+    @Override
+    protected void awaitSearchUpToDate() {
+        ES_EXTENSION.await();
+    }
+
+    @Disabled("JAMES-2688 Unstable test")
+    @Test
+    @Tag(BasicFeature.TAG)
+    @Override
+    public void vaultExportShouldExportZipContainsVaultMessagesToShareeWhenImapDeletedMailbox(GuiceJamesServer jmapServer) {
+
+    }
+
+}