JAMES-3955 Increase consumer timeout for TaskManagerWorkQueue
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
index 8a1cb2d..7961e0c 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
@@ -42,6 +42,11 @@
return this;
}
+ public Builder consumerTimeout(long consumerTimeoutInMillisecond) {
+ arguments.put("x-consumer-timeout", consumerTimeoutInMillisecond);
+ return this;
+ }
+
public ImmutableMap<String, Object> build() {
return arguments.build();
}
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
index e98f142..617cd53 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
@@ -30,6 +30,8 @@
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -38,6 +40,7 @@
import java.util.stream.Stream;
import org.apache.commons.configuration2.Configuration;
+import org.apache.james.util.DurationParser;
import org.apache.james.util.Host;
import com.google.common.base.Preconditions;
@@ -305,6 +308,8 @@
private static final String SSL_KEY_STORE_PASSWORD = "ssl.keystore.password";
private static final String QUEUE_TTL = "notification.queue.ttl";
+ private static final String TASK_QUEUE_CONSUMER_TIMEOUT = "task.queue.consumer.timeout";
+
public static class ManagementCredentials {
static ManagementCredentials from(Configuration configuration) {
@@ -380,6 +385,7 @@
static final int DEFAULT_SHUTDOWN_TIMEOUT = 10_000;
static final int DEFAULT_NETWORK_RECOVERY_INTERVAL = 5_000;
static final int DEFAULT_PORT = 5672;
+ static final Duration DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT = Duration.ofDays(1);
private final URI amqpUri;
private final URI managementUri;
@@ -398,6 +404,7 @@
private Optional<Integer> quorumQueueReplicationFactor;
private Optional<SSLConfiguration> sslConfiguration;
private Optional<Long> queueTTL;
+ private Optional<Duration> taskQueueConsumerTimeout;
private Builder(URI amqpUri, URI managementUri, ManagementCredentials managementCredentials) {
this.amqpUri = amqpUri;
@@ -417,6 +424,7 @@
this.quorumQueueReplicationFactor = Optional.empty();
this.hosts = ImmutableList.builder();
this.queueTTL = Optional.empty();
+ this.taskQueueConsumerTimeout = Optional.empty();
}
public Builder maxRetries(int maxRetries) {
@@ -497,6 +505,11 @@
return this;
}
+ public Builder taskQueueConsumerTimeout(Optional<Duration> taskQueueConsumerTimeout) {
+ this.taskQueueConsumerTimeout = taskQueueConsumerTimeout;
+ return this;
+ }
+
public RabbitMQConfiguration build() {
Preconditions.checkNotNull(amqpUri, "'amqpUri' should not be null");
Preconditions.checkNotNull(managementUri, "'managementUri' should not be null");
@@ -517,7 +530,8 @@
useQuorumQueues.orElse(false),
quorumQueueReplicationFactor.orElse(DEFAULT_QUORUM_QUEUE_REPLICATION_FACTOR),
hostsDefaultingToUri(),
- queueTTL);
+ queueTTL,
+ taskQueueConsumerTimeout.orElse(DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT));
}
private List<Host> hostsDefaultingToUri() {
@@ -567,6 +581,10 @@
Optional<Long> queueTTL = Optional.ofNullable(configuration.getLong(QUEUE_TTL, null));
ManagementCredentials managementCredentials = ManagementCredentials.from(configuration);
+
+ Optional<Duration> taskQueueConsumerTimeout = Optional.ofNullable(configuration.getString(TASK_QUEUE_CONSUMER_TIMEOUT, null))
+ .map(value -> DurationParser.parse(value, ChronoUnit.SECONDS));
+
return builder()
.amqpUri(amqpUri)
.managementUri(managementUri)
@@ -578,6 +596,7 @@
.quorumQueueReplicationFactor(quorumQueueReplicationFactor)
.hosts(hosts)
.queueTTL(queueTTL)
+ .taskQueueConsumerTimeout(taskQueueConsumerTimeout)
.build();
}
@@ -646,11 +665,13 @@
private final List<Host> hosts;
private final ManagementCredentials managementCredentials;
private final Optional<Long> queueTTL;
+ private final Duration taskQueueConsumerTimeout;
private RabbitMQConfiguration(URI uri, URI managementUri, ManagementCredentials managementCredentials, int maxRetries, int minDelayInMs,
int connectionTimeoutInMs, int channelRpcTimeoutInMs, int handshakeTimeoutInMs, int shutdownTimeoutInMs,
int networkRecoveryIntervalInMs, Boolean useSsl, Boolean useSslManagement, SSLConfiguration sslConfiguration,
- boolean useQuorumQueues, int quorumQueueReplicationFactor, List<Host> hosts, Optional<Long> queueTTL) {
+ boolean useQuorumQueues, int quorumQueueReplicationFactor, List<Host> hosts, Optional<Long> queueTTL,
+ Duration taskQueueConsumerTimeout) {
this.uri = uri;
this.managementUri = managementUri;
this.managementCredentials = managementCredentials;
@@ -668,6 +689,7 @@
this.quorumQueueReplicationFactor = quorumQueueReplicationFactor;
this.hosts = hosts;
this.queueTTL = queueTTL;
+ this.taskQueueConsumerTimeout = taskQueueConsumerTimeout;
}
public URI getUri() {
@@ -739,6 +761,10 @@
return queueTTL;
}
+ public Duration getTaskQueueConsumerTimeout() {
+ return taskQueueConsumerTimeout;
+ }
+
@Override
public final boolean equals(Object o) {
if (o instanceof RabbitMQConfiguration) {
@@ -760,7 +786,8 @@
&& Objects.equals(this.useSslManagement, that.useSslManagement)
&& Objects.equals(this.sslConfiguration, that.sslConfiguration)
&& Objects.equals(this.hosts, that.hosts)
- && Objects.equals(this.queueTTL, that.queueTTL);
+ && Objects.equals(this.queueTTL, that.queueTTL)
+ && Objects.equals(this.taskQueueConsumerTimeout, that.taskQueueConsumerTimeout);
}
return false;
}
@@ -768,6 +795,7 @@
@Override
public final int hashCode() {
return Objects.hash(uri, managementUri, maxRetries, minDelayInMs, connectionTimeoutInMs, quorumQueueReplicationFactor, useQuorumQueues, hosts,
- channelRpcTimeoutInMs, handshakeTimeoutInMs, shutdownTimeoutInMs, networkRecoveryIntervalInMs, managementCredentials, useSsl, useSslManagement, sslConfiguration, queueTTL);
+ channelRpcTimeoutInMs, handshakeTimeoutInMs, shutdownTimeoutInMs, networkRecoveryIntervalInMs, managementCredentials, useSsl, useSslManagement,
+ sslConfiguration, queueTTL, taskQueueConsumerTimeout);
}
}
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
index 49fd796..0f175b8 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
@@ -455,6 +456,60 @@
.isInstanceOf(ConversionException.class);
}
+ @Test
+ void emptyTaskQueueConsumerTimeoutShouldDefaultToOneDay() {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
+ configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
+ configuration.addProperty("management.user", DEFAULT_USER);
+ configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);
+
+ assertThat(RabbitMQConfiguration.from(configuration).getTaskQueueConsumerTimeout())
+ .isEqualTo(Duration.ofDays(1));
+ }
+
+ @Test
+ void parseValidTaskQueueConsumerTimeoutShouldSucceed() {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
+ configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
+ configuration.addProperty("management.user", DEFAULT_USER);
+ configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);
+
+ configuration.addProperty("task.queue.consumer.timeout", "2day");
+
+ assertThat(RabbitMQConfiguration.from(configuration).getTaskQueueConsumerTimeout())
+ .isEqualTo(Duration.ofDays(2));
+ }
+
+ @Test
+ void parseTaskQueueConsumerTimeoutWithoutTimeUnitShouldDefaultToSecond() {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
+ configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
+ configuration.addProperty("management.user", DEFAULT_USER);
+ configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);
+
+ configuration.addProperty("task.queue.consumer.timeout", "3600");
+
+ assertThat(RabbitMQConfiguration.from(configuration).getTaskQueueConsumerTimeout())
+ .isEqualTo(Duration.ofSeconds(3600));
+ }
+
+ @Test
+ void parseInvalidTaskQueueConsumerTimeoutShouldFail() {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
+ configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
+ configuration.addProperty("management.user", DEFAULT_USER);
+ configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);
+
+ configuration.addProperty("task.queue.consumer.timeout", "invalid");
+
+ assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration))
+ .isInstanceOf(NumberFormatException.class);
+ }
+
@Nested
class ManagementCredentialsTest {
@Test
diff --git a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
index 610baa1..82273e5 100644
--- a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
+++ b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
@@ -155,4 +155,19 @@
Disable with caution (this only makes sense in a distributed setup where other nodes consume tasks).
Defaults to true.
+Limitation: Sometimes, some tasks running on James can be very heavy and take a couple of hours to complete.
+If other tasks are being triggered meanwhile on WebAdmin, they go on the TaskManagerWorkQueue and James unack them,
+telling RabbitMQ it will consume them later. If they don't get consumed before the consumer timeout setup in
+RabbitMQ (default being 30 minutes), RabbitMQ closes the channel on an exception. It is thus advised to declare a
+longer timeout in rabbitmq.conf. More https://www.rabbitmq.com/consumers.html#acknowledgement-timeout[here].
+
+| task.queue.consumer.timeout
+| Task queue consumer timeout.
+
+Optional. Duration (support multiple time units cf `DurationParser`), defaults to 1 day.
+
+Required at least RabbitMQ version 3.12 to have effect.
+This is used to avoid the task queue consumer (which could run very long tasks) being disconnected by RabbitMQ after the default acknowledgement timeout 30 minutes.
+References: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout.
+
|===
\ No newline at end of file
diff --git a/server/apps/distributed-app/sample-configuration/rabbitmq.properties b/server/apps/distributed-app/sample-configuration/rabbitmq.properties
index 5ce2298..d63f6c0 100644
--- a/server/apps/distributed-app/sample-configuration/rabbitmq.properties
+++ b/server/apps/distributed-app/sample-configuration/rabbitmq.properties
@@ -78,6 +78,11 @@
# Defaults to true.
task.consumption.enabled=true
+# Configure task queue consumer timeout. References: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout. Required at least RabbitMQ version 3.12 to have effect.
+# This is used to avoid the task queue consumer (which could run very long tasks) being disconnected by RabbitMQ after the default acknowledgement timeout 30 minutes.
+# Optional. Duration (support multiple time units cf `DurationParser`), defaults to 1 day.
+#task.queue.consumer.timeout=1day
+
# Configure queue ttl (in ms). References: https://www.rabbitmq.com/ttl.html#queue-ttl.
# This is used only on queues used to share notification patterns, are exclusive to a node. If omitted, it will not add the TTL configure when declaring queues.
# Optional integer, defaults is 3600000.
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 08a1da8..1cfe96d 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -125,6 +125,7 @@
.durable(true)
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!AUTO_DELETE_QUEUE)
.singleActiveConsumer()
+ .consumerTimeout(rabbitMQConfiguration.getTaskQueueConsumerTimeout().toMillis())
.build()))
.retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
Mono<AMQP.Queue.BindOk> bindQueueToExchange = sender
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 8df0172..84d7c87 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -30,6 +30,7 @@
import java.util.stream.IntStream;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.server.task.json.TestTask;
import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
@@ -63,7 +64,7 @@
private RabbitMQWorkQueue testee;
private ImmediateWorker worker;
private JsonTaskSerializer serializer;
-
+ private RabbitMQManagementAPI managementAPI;
@BeforeEach
void setUp() throws Exception {
@@ -72,6 +73,7 @@
testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(),
rabbitMQExtension.getRabbitMQ().getConfiguration());
testee.start();
+ managementAPI = rabbitMQExtension.managementAPI();
}
@AfterEach
@@ -80,6 +82,14 @@
}
@Test
+ void shouldSetConsumerTimeoutArgumentOnTaskQueue() {
+ assertThat(managementAPI.queueDetails("/", "taskManagerWorkQueue")
+ .getArguments()
+ .get("x-consumer-timeout"))
+ .isEqualTo("86400000");
+ }
+
+ @Test
void workQueueShouldConsumeSubmittedTask() {
testee.submit(TASK_WITH_ID);
await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !worker.results.isEmpty());
diff --git a/src/site/xdoc/server/config-rabbitmq.xml b/src/site/xdoc/server/config-rabbitmq.xml
index 4f3f9bd..76e504d 100644
--- a/src/site/xdoc/server/config-rabbitmq.xml
+++ b/src/site/xdoc/server/config-rabbitmq.xml
@@ -194,6 +194,16 @@
Disable with caution (this only makes sense in a distributed setup where other nodes consume tasks).
Defaults to true.
</dd>
+ <dt><strong>task.queue.consumer.timeout</strong></dt>
+ <dd>
+ Task queue consumer timeout.
+
+ Optional. Duration (support multiple time units cf `DurationParser`), defaults to 1 day.
+
+ Required at least RabbitMQ version 3.12 to have effect.
+ This is used to avoid the task queue consumer (which could run very long tasks) being disconnected by RabbitMQ after the default acknowledgement timeout 30 minutes.
+ References: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout.
+ </dd>
</dl>
</section>
diff --git a/upgrade-instructions.md b/upgrade-instructions.md
index 1716725..74d00f1 100644
--- a/upgrade-instructions.md
+++ b/upgrade-instructions.md
@@ -29,6 +29,22 @@
- [Restrict listening interface to loopback by default for webadmin](#restrict-listening-interface-to-loopback-by-default-for-webadmin)
- [TLS host name verification is now enabled by default](#tls-host-name-verification-is-now-enabled-by-default)
+### Increase RabbitMQ consumer timeout on the task queue
+
+Date: 11/20/2023
+
+JIRA: https://issues.apache.org/jira/browse/JAMES-3955
+
+Before, the RabbitMQ default acknowledgement timeout on the `taskManagerWorkQueue` was 30 minutes.
+This is not enough for long-running tasks, therefore we advise to increase the acknowledgement timeout and now enforce it to 1 day by default.
+
+rf: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout
+
+To make the per queue argument feature work, update your RabbitMQ to version 3.12 at least.
+
+If you want the old behavior where consumer timeout defaults to 30 minutes, you can either not upgrade your RabbitMQ or set `task.queue.consumer.timeout=30minutes` in `rabbitmq.properties`.
+
+
### TLS host name verification is now enabled by default
Date: 06/10/2022