[ENHANCEMENT] Metrics for rabbitmq channel pool (#2232)
Will ease channel pool sizing as well as debugging of RabbitMQ channelPool
related errors.
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index f76f098..8560035 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -33,6 +33,7 @@
import jakarta.annotation.PreDestroy;
import org.apache.james.lifecycle.api.Startable;
+import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -671,7 +672,8 @@
private final MetricFactory metricFactory;
private Sender sender;
- public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, Configuration configuration, MetricFactory metricFactory) {
+ public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, Configuration configuration, MetricFactory metricFactory,
+ GaugeRegistry gaugeRegistry) {
this.connectionMono = connectionMono;
this.configuration = configuration;
this.metricFactory = metricFactory;
@@ -697,6 +699,11 @@
.then()
.subscribeOn(Schedulers.boundedElastic()))
.buildPool();
+
+ gaugeRegistry.register("rabbitmq.channels.acquired.size", () -> newPool.metrics().acquiredSize());
+ gaugeRegistry.register("rabbitmq.channels.allocated.size", () -> newPool.metrics().allocatedSize());
+ gaugeRegistry.register("rabbitmq.channels.idle.size", () -> newPool.metrics().idleSize());
+ gaugeRegistry.register("rabbitmq.channels.pending.aquire.size", () -> newPool.metrics().pendingAcquireSize());
}
private Mono<? extends Channel> openChannel(Connection connection) {
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
index caf84cb..e769525 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
@@ -24,6 +24,7 @@
import java.time.Duration;
import java.util.function.Consumer;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
@@ -151,7 +152,8 @@
.retries(2)
.maxBorrowDelay(Duration.ofMillis(250))
.maxChannel(10),
- new RecordingMetricFactory());
+ new RecordingMetricFactory(),
+ new NoopGaugeRegistry());
channelPool.start();
}
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
index 766ccbb..3866638 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
@@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.awaitility.Awaitility;
@@ -76,7 +77,8 @@
.retries(2)
.maxBorrowDelay(Duration.ofSeconds(2))
.maxChannel(poolSize),
- new RecordingMetricFactory());
+ new RecordingMetricFactory(),
+ new NoopGaugeRegistry());
reactorRabbitMQChannelPool.start();
return reactorRabbitMQChannelPool;
}
diff --git a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
index 19009a7..a40509d 100644
--- a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
+++ b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
@@ -36,6 +36,7 @@
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeName;
import org.apache.mailet.AttributeValue;
@@ -138,7 +139,7 @@
.initialDelay(Duration.ofMillis(5)));
reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
ReactorRabbitMQChannelPool.Configuration.DEFAULT,
- metricFactory);
+ metricFactory, new NoopGaugeRegistry());
reactorRabbitMQChannelPool.start();
sender = reactorRabbitMQChannelPool.getSender();
sender.declareExchange(ExchangeSpecification.exchange(exchange));
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index 8aab97c..7a20f78 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -47,6 +47,7 @@
import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
import org.apache.james.mailbox.store.StoreSubscriptionManager;
import org.apache.james.mailbox.store.quota.DefaultUserQuotaRootResolver;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.james.metrics.logger.DefaultMetricFactory;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.mpt.api.ImapFeatures;
@@ -82,7 +83,7 @@
.initialDelay(Duration.ofMillis(5)));
reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
ReactorRabbitMQChannelPool.Configuration.DEFAULT,
- new DefaultMetricFactory());
+ new DefaultMetricFactory(), new NoopGaugeRegistry());
reactorRabbitMQChannelPool.start();
eventBus = createEventBus();
eventBus.start();
diff --git a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
index 4f722f5..856761f 100644
--- a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
@@ -32,6 +32,7 @@
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.utils.InitializationOperation;
import org.apache.james.utils.InitilizationOperationBuilder;
@@ -94,11 +95,12 @@
@Singleton
ReactorRabbitMQChannelPool provideReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool,
ReactorRabbitMQChannelPool.Configuration configuration,
- MetricFactory metricFactory) {
+ MetricFactory metricFactory, GaugeRegistry gaugeRegistry) {
ReactorRabbitMQChannelPool channelPool = new ReactorRabbitMQChannelPool(
simpleConnectionPool.getResilientConnection(),
configuration,
- metricFactory);
+ metricFactory,
+ gaugeRegistry);
channelPool.start();
return channelPool;
}