JAMES-3605 Implement DeletedMessageVaultWorkQueueReconnectionHandler

So the consumer for deleted message vault queue could reconnect if needed.
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
new file mode 100644
index 0000000..0893a66
--- /dev/null
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
@@ -0,0 +1,43 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.mailbox;
+
+import jakarta.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.reactivestreams.Publisher;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+
+public class DeletedMessageVaultWorkQueueReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler {
+    private final DistributedDeletedMessageVaultDeletionCallback distributedDeletedMessageVaultDeletionCallback;
+
+    @Inject
+    public DeletedMessageVaultWorkQueueReconnectionHandler(DistributedDeletedMessageVaultDeletionCallback distributedDeletedMessageVaultDeletionCallback) {
+        this.distributedDeletedMessageVaultDeletionCallback = distributedDeletedMessageVaultDeletionCallback;
+    }
+
+    @Override
+    public Publisher<Void> handleReconnection(Connection connection) {
+        return Mono.fromRunnable(distributedDeletedMessageVaultDeletionCallback::restart);
+    }
+}
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
index 6be7246..6bed717 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
@@ -35,6 +35,7 @@
 
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.core.Username;
 import org.apache.james.lifecycle.api.Startable;
@@ -168,7 +169,7 @@
     private final MailboxId.Factory mailboxIdFactory;
     private final MessageId.Factory messageIdFactory;
     private final BlobId.Factory blobIdFactory;
-    private Receiver receiver;
+    private final ReceiverProvider receiverProvider;
     private Disposable disposable;
 
     @Inject
@@ -178,7 +179,8 @@
                                                           DeletedMessageVaultDeletionCallback callback,
                                                           MailboxId.Factory mailboxIdFactory,
                                                           MessageId.Factory messageIdFactory,
-                                                          BlobId.Factory blobIdFactory) {
+                                                          BlobId.Factory blobIdFactory,
+                                                          ReceiverProvider receiverProvider) {
         this.sender = sender;
         this.rabbitMQConfiguration = rabbitMQConfiguration;
         this.callback = callback;
@@ -187,6 +189,7 @@
         this.blobIdFactory = blobIdFactory;
         this.objectMapper = new ObjectMapper();
         this.channelPool = channelPool;
+        this.receiverProvider = receiverProvider;
     }
 
     public void init() {
@@ -215,17 +218,28 @@
             .then()
             .block();
 
-        receiver = channelPool.createReceiver();
-        disposable = receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS))
+        disposable = consumeDeletedMessageVaultWorkQueue();
+    }
+
+    private Disposable consumeDeletedMessageVaultWorkQueue() {
+        return Flux.using(
+                receiverProvider::createReceiver,
+                receiver -> receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS)),
+                Receiver::close)
             .flatMap(this::handleMessage)
             .subscribeOn(Schedulers.boundedElastic())
             .subscribe();
     }
 
+    public void restart() {
+        Disposable previousConsumer = disposable;
+        disposable = consumeDeletedMessageVaultWorkQueue();
+        previousConsumer.dispose();
+    }
+
     @PreDestroy
     public void stop() {
         Optional.ofNullable(disposable).ifPresent(Disposable::dispose);
-        Optional.ofNullable(receiver).ifPresent(Receiver::close);
     }
 
     private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
index 68c1744..2a2078e 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
@@ -20,6 +20,7 @@
 package org.apache.james.modules.mailbox;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.mailbox.cassandra.DeleteMessageListener;
 import org.apache.james.modules.vault.DeletedMessageVaultModule;
 import org.apache.james.utils.InitializationOperation;
@@ -68,6 +69,9 @@
             .addBinding()
             .to(DistributedDeletedMessageVaultDeletionCallback.class);
         bind(DistributedDeletedMessageVaultDeletionCallback.class).in(Scopes.SINGLETON);
+
+        Multibinder<SimpleConnectionPool.ReconnectionHandler> reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class);
+        reconnectionHandlerMultibinder.addBinding().to(DeletedMessageVaultWorkQueueReconnectionHandler.class);
     }
 
     @ProvidesIntoSet