JAMES-3955 Increase consumer timeout for TaskManagerWorkQueue
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
index 8a1cb2d..7961e0c 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
@@ -42,6 +42,11 @@
             return this;
         }
 
+        public Builder consumerTimeout(long consumerTimeoutInMillisecond) {
+            arguments.put("x-consumer-timeout", consumerTimeoutInMillisecond);
+            return this;
+        }
+
         public ImmutableMap<String, Object> build() {
             return arguments.build();
         }
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
index e98f142..617cd53 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
@@ -30,6 +30,8 @@
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -38,6 +40,7 @@
 import java.util.stream.Stream;
 
 import org.apache.commons.configuration2.Configuration;
+import org.apache.james.util.DurationParser;
 import org.apache.james.util.Host;
 
 import com.google.common.base.Preconditions;
@@ -305,6 +308,8 @@
     private static final String SSL_KEY_STORE_PASSWORD = "ssl.keystore.password";
     private static final String QUEUE_TTL = "notification.queue.ttl";
 
+    private static final String TASK_QUEUE_CONSUMER_TIMEOUT = "task.queue.consumer.timeout";
+
     public static class ManagementCredentials {
 
         static ManagementCredentials from(Configuration configuration) {
@@ -380,6 +385,7 @@
         static final int DEFAULT_SHUTDOWN_TIMEOUT = 10_000;
         static final int DEFAULT_NETWORK_RECOVERY_INTERVAL = 5_000;
         static final int DEFAULT_PORT = 5672;
+        static final Duration DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT = Duration.ofDays(1);
 
         private final URI amqpUri;
         private final URI managementUri;
@@ -398,6 +404,7 @@
         private Optional<Integer> quorumQueueReplicationFactor;
         private Optional<SSLConfiguration> sslConfiguration;
         private Optional<Long> queueTTL;
+        private Optional<Duration> taskQueueConsumerTimeout;
 
         private Builder(URI amqpUri, URI managementUri, ManagementCredentials managementCredentials) {
             this.amqpUri = amqpUri;
@@ -417,6 +424,7 @@
             this.quorumQueueReplicationFactor = Optional.empty();
             this.hosts = ImmutableList.builder();
             this.queueTTL = Optional.empty();
+            this.taskQueueConsumerTimeout = Optional.empty();
         }
 
         public Builder maxRetries(int maxRetries) {
@@ -497,6 +505,11 @@
             return this;
         }
 
+        public Builder taskQueueConsumerTimeout(Optional<Duration> taskQueueConsumerTimeout) {
+            this.taskQueueConsumerTimeout = taskQueueConsumerTimeout;
+            return this;
+        }
+
         public RabbitMQConfiguration build() {
             Preconditions.checkNotNull(amqpUri, "'amqpUri' should not be null");
             Preconditions.checkNotNull(managementUri, "'managementUri' should not be null");
@@ -517,7 +530,8 @@
                     useQuorumQueues.orElse(false),
                     quorumQueueReplicationFactor.orElse(DEFAULT_QUORUM_QUEUE_REPLICATION_FACTOR),
                     hostsDefaultingToUri(),
-                    queueTTL);
+                    queueTTL,
+                    taskQueueConsumerTimeout.orElse(DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT));
         }
 
         private List<Host> hostsDefaultingToUri() {
@@ -567,6 +581,10 @@
         Optional<Long> queueTTL = Optional.ofNullable(configuration.getLong(QUEUE_TTL, null));
 
         ManagementCredentials managementCredentials = ManagementCredentials.from(configuration);
+
+        Optional<Duration> taskQueueConsumerTimeout = Optional.ofNullable(configuration.getString(TASK_QUEUE_CONSUMER_TIMEOUT, null))
+            .map(value -> DurationParser.parse(value, ChronoUnit.SECONDS));
+
         return builder()
             .amqpUri(amqpUri)
             .managementUri(managementUri)
@@ -578,6 +596,7 @@
             .quorumQueueReplicationFactor(quorumQueueReplicationFactor)
             .hosts(hosts)
             .queueTTL(queueTTL)
+            .taskQueueConsumerTimeout(taskQueueConsumerTimeout)
             .build();
     }
 
@@ -646,11 +665,13 @@
     private final List<Host> hosts;
     private final ManagementCredentials managementCredentials;
     private final Optional<Long> queueTTL;
+    private final Duration taskQueueConsumerTimeout;
 
     private RabbitMQConfiguration(URI uri, URI managementUri, ManagementCredentials managementCredentials, int maxRetries, int minDelayInMs,
                                   int connectionTimeoutInMs, int channelRpcTimeoutInMs, int handshakeTimeoutInMs, int shutdownTimeoutInMs,
                                   int networkRecoveryIntervalInMs, Boolean useSsl, Boolean useSslManagement, SSLConfiguration sslConfiguration,
-                                  boolean useQuorumQueues, int quorumQueueReplicationFactor, List<Host> hosts, Optional<Long> queueTTL) {
+                                  boolean useQuorumQueues, int quorumQueueReplicationFactor, List<Host> hosts, Optional<Long> queueTTL,
+                                  Duration taskQueueConsumerTimeout) {
         this.uri = uri;
         this.managementUri = managementUri;
         this.managementCredentials = managementCredentials;
@@ -668,6 +689,7 @@
         this.quorumQueueReplicationFactor = quorumQueueReplicationFactor;
         this.hosts = hosts;
         this.queueTTL = queueTTL;
+        this.taskQueueConsumerTimeout = taskQueueConsumerTimeout;
     }
 
     public URI getUri() {
@@ -739,6 +761,10 @@
         return queueTTL;
     }
 
+    public Duration getTaskQueueConsumerTimeout() {
+        return taskQueueConsumerTimeout;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof RabbitMQConfiguration) {
@@ -760,7 +786,8 @@
                 && Objects.equals(this.useSslManagement, that.useSslManagement)
                 && Objects.equals(this.sslConfiguration, that.sslConfiguration)
                 && Objects.equals(this.hosts, that.hosts)
-                && Objects.equals(this.queueTTL, that.queueTTL);
+                && Objects.equals(this.queueTTL, that.queueTTL)
+                && Objects.equals(this.taskQueueConsumerTimeout, that.taskQueueConsumerTimeout);
         }
         return false;
     }
@@ -768,6 +795,7 @@
     @Override
     public final int hashCode() {
         return Objects.hash(uri, managementUri, maxRetries, minDelayInMs, connectionTimeoutInMs, quorumQueueReplicationFactor, useQuorumQueues, hosts,
-            channelRpcTimeoutInMs, handshakeTimeoutInMs, shutdownTimeoutInMs, networkRecoveryIntervalInMs, managementCredentials, useSsl, useSslManagement, sslConfiguration, queueTTL);
+            channelRpcTimeoutInMs, handshakeTimeoutInMs, shutdownTimeoutInMs, networkRecoveryIntervalInMs, managementCredentials, useSsl, useSslManagement,
+            sslConfiguration, queueTTL, taskQueueConsumerTimeout);
     }
 }
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
index 49fd796..0f175b8 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Optional;
 
@@ -455,6 +456,60 @@
             .isInstanceOf(ConversionException.class);
     }
 
+    @Test
+    void emptyTaskQueueConsumerTimeoutShouldDefaultToOneDay() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
+        configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
+        configuration.addProperty("management.user", DEFAULT_USER);
+        configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);
+
+        assertThat(RabbitMQConfiguration.from(configuration).getTaskQueueConsumerTimeout())
+            .isEqualTo(Duration.ofDays(1));
+    }
+
+    @Test
+    void parseValidTaskQueueConsumerTimeoutShouldSucceed() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
+        configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
+        configuration.addProperty("management.user", DEFAULT_USER);
+        configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);
+
+        configuration.addProperty("task.queue.consumer.timeout", "2day");
+
+        assertThat(RabbitMQConfiguration.from(configuration).getTaskQueueConsumerTimeout())
+            .isEqualTo(Duration.ofDays(2));
+    }
+
+    @Test
+    void parseTaskQueueConsumerTimeoutWithoutTimeUnitShouldDefaultToSecond() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
+        configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
+        configuration.addProperty("management.user", DEFAULT_USER);
+        configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);
+
+        configuration.addProperty("task.queue.consumer.timeout", "3600");
+
+        assertThat(RabbitMQConfiguration.from(configuration).getTaskQueueConsumerTimeout())
+            .isEqualTo(Duration.ofSeconds(3600));
+    }
+
+    @Test
+    void parseInvalidTaskQueueConsumerTimeoutShouldFail() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
+        configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
+        configuration.addProperty("management.user", DEFAULT_USER);
+        configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);
+
+        configuration.addProperty("task.queue.consumer.timeout", "invalid");
+
+        assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration))
+            .isInstanceOf(NumberFormatException.class);
+    }
+
     @Nested
     class ManagementCredentialsTest {
         @Test
diff --git a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
index 610baa1..82273e5 100644
--- a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
+++ b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
@@ -155,4 +155,19 @@
 Disable with caution (this only makes sense in a distributed setup where other nodes consume tasks).
 Defaults to true.
 
+Limitation: Sometimes, some tasks running on James can be very heavy and take a couple of hours to complete.
+If other tasks are being triggered meanwhile on WebAdmin, they go on the TaskManagerWorkQueue and James unack them,
+telling RabbitMQ it will consume them later. If they don't get consumed before the consumer timeout setup in
+RabbitMQ (default being 30 minutes), RabbitMQ closes the channel on an exception. It is thus advised to declare a
+longer timeout in rabbitmq.conf. More https://www.rabbitmq.com/consumers.html#acknowledgement-timeout[here].
+
+| task.queue.consumer.timeout
+| Task queue consumer timeout.
+
+Optional. Duration (support multiple time units cf `DurationParser`), defaults to 1 day.
+
+Required at least RabbitMQ version 3.12 to have effect.
+This is used to avoid the task queue consumer (which could run very long tasks) being disconnected by RabbitMQ after the default acknowledgement timeout 30 minutes.
+References: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout.
+
 |===
\ No newline at end of file
diff --git a/server/apps/distributed-app/sample-configuration/rabbitmq.properties b/server/apps/distributed-app/sample-configuration/rabbitmq.properties
index 5ce2298..d63f6c0 100644
--- a/server/apps/distributed-app/sample-configuration/rabbitmq.properties
+++ b/server/apps/distributed-app/sample-configuration/rabbitmq.properties
@@ -78,6 +78,11 @@
 # Defaults to true.
 task.consumption.enabled=true
 
+# Configure task queue consumer timeout. References: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout. Required at least RabbitMQ version 3.12 to have effect.
+# This is used to avoid the task queue consumer (which could run very long tasks) being disconnected by RabbitMQ after the default acknowledgement timeout 30 minutes.
+# Optional. Duration (support multiple time units cf `DurationParser`), defaults to 1 day.
+#task.queue.consumer.timeout=1day
+
 # Configure queue ttl (in ms). References: https://www.rabbitmq.com/ttl.html#queue-ttl.
 # This is used only on queues used to share notification patterns, are exclusive to a node. If omitted, it will not add the TTL configure when declaring queues.
 # Optional integer, defaults is 3600000.
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 08a1da8..1cfe96d 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -125,6 +125,7 @@
                 .durable(true)
                 .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!AUTO_DELETE_QUEUE)
                     .singleActiveConsumer()
+                    .consumerTimeout(rabbitMQConfiguration.getTaskQueueConsumerTimeout().toMillis())
                     .build()))
             .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
         Mono<AMQP.Queue.BindOk> bindQueueToExchange = sender
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 8df0172..84d7c87 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -30,6 +30,7 @@
 import java.util.stream.IntStream;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.TestTask;
 import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
@@ -63,7 +64,7 @@
     private RabbitMQWorkQueue testee;
     private ImmediateWorker worker;
     private JsonTaskSerializer serializer;
-
+    private RabbitMQManagementAPI managementAPI;
 
     @BeforeEach
     void setUp() throws Exception {
@@ -72,6 +73,7 @@
         testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(),
             rabbitMQExtension.getRabbitMQ().getConfiguration());
         testee.start();
+        managementAPI = rabbitMQExtension.managementAPI();
     }
 
     @AfterEach
@@ -80,6 +82,14 @@
     }
 
     @Test
+    void shouldSetConsumerTimeoutArgumentOnTaskQueue() {
+        assertThat(managementAPI.queueDetails("/", "taskManagerWorkQueue")
+            .getArguments()
+            .get("x-consumer-timeout"))
+            .isEqualTo("86400000");
+    }
+
+    @Test
     void workQueueShouldConsumeSubmittedTask() {
         testee.submit(TASK_WITH_ID);
         await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !worker.results.isEmpty());
diff --git a/src/site/xdoc/server/config-rabbitmq.xml b/src/site/xdoc/server/config-rabbitmq.xml
index 4f3f9bd..76e504d 100644
--- a/src/site/xdoc/server/config-rabbitmq.xml
+++ b/src/site/xdoc/server/config-rabbitmq.xml
@@ -194,6 +194,16 @@
               Disable with caution (this only makes sense in a distributed setup where other nodes consume tasks).
               Defaults to true.
           </dd>
+          <dt><strong>task.queue.consumer.timeout</strong></dt>
+          <dd>
+              Task queue consumer timeout.
+
+              Optional. Duration (support multiple time units cf `DurationParser`), defaults to 1 day.
+
+              Required at least RabbitMQ version 3.12 to have effect.
+              This is used to avoid the task queue consumer (which could run very long tasks) being disconnected by RabbitMQ after the default acknowledgement timeout 30 minutes.
+              References: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout.
+          </dd>
       </dl>
   </section>
 
diff --git a/upgrade-instructions.md b/upgrade-instructions.md
index 1716725..74d00f1 100644
--- a/upgrade-instructions.md
+++ b/upgrade-instructions.md
@@ -29,6 +29,22 @@
  - [Restrict listening interface to loopback by default for webadmin](#restrict-listening-interface-to-loopback-by-default-for-webadmin)
  - [TLS host name verification is now enabled by default](#tls-host-name-verification-is-now-enabled-by-default)
 
+### Increase RabbitMQ consumer timeout on the task queue
+
+Date: 11/20/2023
+
+JIRA: https://issues.apache.org/jira/browse/JAMES-3955
+
+Before, the RabbitMQ default acknowledgement timeout on the `taskManagerWorkQueue` was 30 minutes.
+This is not enough for long-running tasks, therefore we advise to increase the acknowledgement timeout and now enforce it to 1 day by default.
+
+rf: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout
+
+To make the per queue argument feature work, update your RabbitMQ to version 3.12 at least.
+
+If you want the old behavior where consumer timeout defaults to 30 minutes, you can either not upgrade your RabbitMQ or set `task.queue.consumer.timeout=30minutes` in `rabbitmq.properties`.
+
+
 ### TLS host name verification is now enabled by default
 
 Date: 06/10/2022