JAMES-2586 Create metrics for PostgresExecutor
diff --git a/backends-common/postgres/pom.xml b/backends-common/postgres/pom.xml
index 437c49b..b3477fa 100644
--- a/backends-common/postgres/pom.xml
+++ b/backends-common/postgres/pom.xml
@@ -54,6 +54,10 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>metrics-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>testing-base</artifactId>
<scope>test</scope>
</dependency>
diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index cb63a7e..879708a 100644
--- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -32,6 +32,7 @@
import org.apache.james.backends.postgres.PostgresConfiguration;
import org.apache.james.core.Domain;
+import org.apache.james.metrics.api.MetricFactory;
import org.jooq.DSLContext;
import org.jooq.DeleteResultStep;
import org.jooq.Record;
@@ -66,16 +67,19 @@
private final JamesPostgresConnectionFactory jamesPostgresConnectionFactory;
private final PostgresConfiguration postgresConfiguration;
+ private final MetricFactory metricFactory;
@Inject
public Factory(JamesPostgresConnectionFactory jamesPostgresConnectionFactory,
- PostgresConfiguration postgresConfiguration) {
+ PostgresConfiguration postgresConfiguration,
+ MetricFactory metricFactory) {
this.jamesPostgresConnectionFactory = jamesPostgresConnectionFactory;
this.postgresConfiguration = postgresConfiguration;
+ this.metricFactory = metricFactory;
}
public PostgresExecutor create(Optional<Domain> domain) {
- return new PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain), postgresConfiguration);
+ return new PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain), postgresConfiguration, metricFactory);
}
public PostgresExecutor create() {
@@ -90,11 +94,14 @@
private final Mono<Connection> connection;
private final PostgresConfiguration postgresConfiguration;
+ private final MetricFactory metricFactory;
private PostgresExecutor(Mono<Connection> connection,
- PostgresConfiguration postgresConfiguration) {
+ PostgresConfiguration postgresConfiguration,
+ MetricFactory metricFactory) {
this.connection = connection;
this.postgresConfiguration = postgresConfiguration;
+ this.metricFactory = metricFactory;
}
public Mono<DSLContext> dslContext() {
@@ -102,44 +109,48 @@
}
public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) {
- return dslContext()
- .flatMap(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()))
- .then();
+ return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMap(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))
+ .then()));
}
public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> queryFunction) {
- return dslContext()
- .flatMapMany(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .collectList()
- .flatMapIterable(list -> list) // Mitigation fix for https://github.com/jOOQ/jOOQ/issues/16556
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()));
+ return Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMapMany(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .collectList()
+ .flatMapIterable(list -> list) // Mitigation fix for https://github.com/jOOQ/jOOQ/issues/16556
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))));
}
public Flux<Record> executeDeleteAndReturnList(Function<DSLContext, DeleteResultStep<Record>> queryFunction) {
- return dslContext()
- .flatMapMany(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .collectList()
- .flatMapIterable(list -> list) // The convert Flux -> Mono<List> -> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()));
+ return Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMapMany(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .collectList()
+ .flatMapIterable(list -> list) // The convert Flux -> Mono<List> -> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))));
}
public Mono<Record> executeRow(Function<DSLContext, Publisher<Record>> queryFunction) {
- return dslContext()
- .flatMap(queryFunction.andThen(Mono::from))
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()));
+ return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMap(queryFunction.andThen(Mono::from))
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))));
}
public Mono<Optional<Record>> executeSingleRowOptional(Function<DSLContext, Publisher<Record>> queryFunction) {
@@ -149,13 +160,14 @@
}
public Mono<Integer> executeCount(Function<DSLContext, Mono<Record1<Integer>>> queryFunction) {
- return dslContext()
- .flatMap(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()))
- .map(Record1::value1);
+ return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMap(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))
+ .map(Record1::value1)));
}
public Mono<Boolean> executeExists(Function<DSLContext, SelectConditionStep<?>> queryFunction) {
@@ -164,12 +176,13 @@
}
public Mono<Integer> executeReturnAffectedRowsCount(Function<DSLContext, Mono<Integer>> queryFunction) {
- return dslContext()
- .flatMap(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()));
+ return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMap(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))));
}
public Mono<Connection> connection() {
diff --git a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
index 88c2124..35a4e9b 100644
--- a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
+++ b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
@@ -30,6 +30,7 @@
import org.apache.james.backends.postgres.utils.DomainImplPostgresConnectionFactory;
import org.apache.james.backends.postgres.utils.PostgresExecutor;
import org.apache.james.backends.postgres.utils.SinglePostgresConnectionFactory;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -139,12 +140,13 @@
.build());
if (rlsEnabled) {
- executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(connectionFactory), postgresConfiguration);
+ executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(connectionFactory), postgresConfiguration, new RecordingMetricFactory());
} else {
executorFactory = new PostgresExecutor.Factory(new SinglePostgresConnectionFactory(connectionFactory.create()
.cache()
.cast(Connection.class).block()),
- postgresConfiguration);
+ postgresConfiguration,
+ new RecordingMetricFactory());
}
postgresExecutor = executorFactory.create();
@@ -154,7 +156,7 @@
.password(postgresConfiguration.getNonRLSCredential().getPassword())
.build())
.flatMap(configuration -> new PostgresqlConnectionFactory(configuration).create().cache())
- .map(connection -> new PostgresExecutor.Factory(new SinglePostgresConnectionFactory(connection), postgresConfiguration).create())
+ .map(connection -> new PostgresExecutor.Factory(new SinglePostgresConnectionFactory(connection), postgresConfiguration, new RecordingMetricFactory()).create())
.block();
} else {
nonRLSPostgresExecutor = postgresExecutor;
diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
index c6dced4..f23a8c0 100644
--- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
+++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
@@ -42,6 +42,7 @@
import org.apache.james.mailbox.postgres.PostgresMailboxSessionMapperFactory;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapper;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
import org.apache.james.utils.UpdatableTickingClock;
import org.junit.jupiter.api.BeforeEach;
@@ -73,7 +74,7 @@
public void setUp() {
BlobId.Factory blobIdFactory = new HashBlobId.Factory();
postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
- postgresExtension.getPostgresConfiguration()),
+ postgresExtension.getPostgresConfiguration(), new RecordingMetricFactory()),
new UpdatableTickingClock(Instant.now()),
new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory),
blobIdFactory);
diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
index 4680c58..cfc7dcc 100644
--- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
+++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
@@ -31,6 +31,7 @@
import org.apache.james.mailbox.model.UidValidity;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapperFactory;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -45,7 +46,8 @@
@BeforeEach
public void setUp() {
PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
- postgresExtension.getPostgresConfiguration());
+ postgresExtension.getPostgresConfiguration(),
+ new RecordingMetricFactory());
mailboxMapperFactory = session -> new PostgresMailboxMapper(new PostgresMailboxDAO(executorFactory.create(session.getUser().getDomainPart())));
}
diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
index 6b472e6..55743ba 100644
--- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
+++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
@@ -50,6 +50,7 @@
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
import org.apache.james.utils.UpdatableTickingClock;
import org.junit.jupiter.api.BeforeEach;
@@ -80,7 +81,7 @@
public void setUp() {
BlobId.Factory blobIdFactory = new HashBlobId.Factory();
postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
- postgresExtension.getPostgresConfiguration()),
+ postgresExtension.getPostgresConfiguration(), new RecordingMetricFactory()),
new UpdatableTickingClock(Instant.now()),
new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory),
blobIdFactory);
diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
index daf676b..a28bd34 100644
--- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
+++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
@@ -29,6 +29,7 @@
import org.apache.james.mailbox.MailboxSessionUtil;
import org.apache.james.mailbox.store.user.SubscriptionMapperFactory;
import org.apache.james.mailbox.store.user.model.Subscription;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -42,7 +43,7 @@
@BeforeEach
public void setUp() {
PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
- postgresExtension.getPostgresConfiguration());
+ postgresExtension.getPostgresConfiguration(), new RecordingMetricFactory());
subscriptionMapperFactory = session -> new PostgresSubscriptionMapper(new PostgresSubscriptionDAO(executorFactory.create(session.getUser().getDomainPart())));
}
diff --git a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
index a65fa05..a2e882f 100644
--- a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
+++ b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
@@ -34,6 +34,7 @@
import org.apache.james.backends.postgres.utils.PostgresHealthCheck;
import org.apache.james.backends.postgres.utils.SinglePostgresConnectionFactory;
import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.utils.InitializationOperation;
import org.apache.james.utils.InitilizationOperationBuilder;
import org.apache.james.utils.PropertiesProvider;
@@ -141,8 +142,9 @@
@Named(PostgresExecutor.NON_RLS_INJECT)
@Singleton
PostgresExecutor.Factory postgresExecutorFactoryWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT) JamesPostgresConnectionFactory singlePostgresConnectionFactory,
- PostgresConfiguration postgresConfiguration) {
- return new PostgresExecutor.Factory(singlePostgresConnectionFactory, postgresConfiguration);
+ PostgresConfiguration postgresConfiguration,
+ MetricFactory metricFactory) {
+ return new PostgresExecutor.Factory(singlePostgresConnectionFactory, postgresConfiguration, metricFactory);
}
@Provides