JAMES-3605 Implement DeletedMessageVaultWorkQueueReconnectionHandler
So the consumer for deleted message vault queue could reconnect if needed.
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
new file mode 100644
index 0000000..0893a66
--- /dev/null
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
@@ -0,0 +1,43 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.modules.mailbox;
+
+import jakarta.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.reactivestreams.Publisher;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+
+public class DeletedMessageVaultWorkQueueReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler {
+ private final DistributedDeletedMessageVaultDeletionCallback distributedDeletedMessageVaultDeletionCallback;
+
+ @Inject
+ public DeletedMessageVaultWorkQueueReconnectionHandler(DistributedDeletedMessageVaultDeletionCallback distributedDeletedMessageVaultDeletionCallback) {
+ this.distributedDeletedMessageVaultDeletionCallback = distributedDeletedMessageVaultDeletionCallback;
+ }
+
+ @Override
+ public Publisher<Void> handleReconnection(Connection connection) {
+ return Mono.fromRunnable(distributedDeletedMessageVaultDeletionCallback::restart);
+ }
+}
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
index 6be7246..6bed717 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
@@ -35,6 +35,7 @@
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.blob.api.BlobId;
import org.apache.james.core.Username;
import org.apache.james.lifecycle.api.Startable;
@@ -168,7 +169,7 @@
private final MailboxId.Factory mailboxIdFactory;
private final MessageId.Factory messageIdFactory;
private final BlobId.Factory blobIdFactory;
- private Receiver receiver;
+ private final ReceiverProvider receiverProvider;
private Disposable disposable;
@Inject
@@ -178,7 +179,8 @@
DeletedMessageVaultDeletionCallback callback,
MailboxId.Factory mailboxIdFactory,
MessageId.Factory messageIdFactory,
- BlobId.Factory blobIdFactory) {
+ BlobId.Factory blobIdFactory,
+ ReceiverProvider receiverProvider) {
this.sender = sender;
this.rabbitMQConfiguration = rabbitMQConfiguration;
this.callback = callback;
@@ -187,6 +189,7 @@
this.blobIdFactory = blobIdFactory;
this.objectMapper = new ObjectMapper();
this.channelPool = channelPool;
+ this.receiverProvider = receiverProvider;
}
public void init() {
@@ -215,17 +218,28 @@
.then()
.block();
- receiver = channelPool.createReceiver();
- disposable = receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS))
+ disposable = consumeDeletedMessageVaultWorkQueue();
+ }
+
+ private Disposable consumeDeletedMessageVaultWorkQueue() {
+ return Flux.using(
+ receiverProvider::createReceiver,
+ receiver -> receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS)),
+ Receiver::close)
.flatMap(this::handleMessage)
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
+ public void restart() {
+ Disposable previousConsumer = disposable;
+ disposable = consumeDeletedMessageVaultWorkQueue();
+ previousConsumer.dispose();
+ }
+
@PreDestroy
public void stop() {
Optional.ofNullable(disposable).ifPresent(Disposable::dispose);
- Optional.ofNullable(receiver).ifPresent(Receiver::close);
}
private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
index 68c1744..2a2078e 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
@@ -20,6 +20,7 @@
package org.apache.james.modules.mailbox;
import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.mailbox.cassandra.DeleteMessageListener;
import org.apache.james.modules.vault.DeletedMessageVaultModule;
import org.apache.james.utils.InitializationOperation;
@@ -68,6 +69,9 @@
.addBinding()
.to(DistributedDeletedMessageVaultDeletionCallback.class);
bind(DistributedDeletedMessageVaultDeletionCallback.class).in(Scopes.SINGLETON);
+
+ Multibinder<SimpleConnectionPool.ReconnectionHandler> reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class);
+ reconnectionHandlerMultibinder.addBinding().to(DeletedMessageVaultWorkQueueReconnectionHandler.class);
}
@ProvidesIntoSet