blob: 879708aad7c415f068499239de5489462b3cc526 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.backends.postgres.utils;
import static org.jooq.impl.DSL.exists;
import static org.jooq.impl.DSL.field;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.inject.Inject;
import org.apache.james.backends.postgres.PostgresConfiguration;
import org.apache.james.core.Domain;
import org.apache.james.metrics.api.MetricFactory;
import org.jooq.DSLContext;
import org.jooq.DeleteResultStep;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.SQLDialect;
import org.jooq.SelectConditionStep;
import org.jooq.conf.Settings;
import org.jooq.conf.StatementType;
import org.jooq.impl.DSL;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.R2dbcBadGrammarException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
public class PostgresExecutor {
public static final String DEFAULT_INJECT = "default";
public static final String NON_RLS_INJECT = "non_rls";
public static final int MAX_RETRY_ATTEMPTS = 5;
public static final Duration MIN_BACKOFF = Duration.ofMillis(1);
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresExecutor.class);
private static final String JOOQ_TIMEOUT_ERROR_LOG = "Time out executing Postgres query. May need to check either jOOQ reactive issue or Postgres DB performance.";
public static class Factory {
private final JamesPostgresConnectionFactory jamesPostgresConnectionFactory;
private final PostgresConfiguration postgresConfiguration;
private final MetricFactory metricFactory;
@Inject
public Factory(JamesPostgresConnectionFactory jamesPostgresConnectionFactory,
PostgresConfiguration postgresConfiguration,
MetricFactory metricFactory) {
this.jamesPostgresConnectionFactory = jamesPostgresConnectionFactory;
this.postgresConfiguration = postgresConfiguration;
this.metricFactory = metricFactory;
}
public PostgresExecutor create(Optional<Domain> domain) {
return new PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain), postgresConfiguration, metricFactory);
}
public PostgresExecutor create() {
return create(Optional.empty());
}
}
private static final SQLDialect PGSQL_DIALECT = SQLDialect.POSTGRES;
private static final Settings SETTINGS = new Settings()
.withRenderFormatted(true)
.withStatementType(StatementType.PREPARED_STATEMENT);
private final Mono<Connection> connection;
private final PostgresConfiguration postgresConfiguration;
private final MetricFactory metricFactory;
private PostgresExecutor(Mono<Connection> connection,
PostgresConfiguration postgresConfiguration,
MetricFactory metricFactory) {
this.connection = connection;
this.postgresConfiguration = postgresConfiguration;
this.metricFactory = metricFactory;
}
public Mono<DSLContext> dslContext() {
return connection.map(con -> DSL.using(con, PGSQL_DIALECT, SETTINGS));
}
public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) {
return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
dslContext()
.flatMap(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()))
.then()));
}
public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> queryFunction) {
return Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
dslContext()
.flatMapMany(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.collectList()
.flatMapIterable(list -> list) // Mitigation fix for https://github.com/jOOQ/jOOQ/issues/16556
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()))));
}
public Flux<Record> executeDeleteAndReturnList(Function<DSLContext, DeleteResultStep<Record>> queryFunction) {
return Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
dslContext()
.flatMapMany(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.collectList()
.flatMapIterable(list -> list) // The convert Flux -> Mono<List> -> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()))));
}
public Mono<Record> executeRow(Function<DSLContext, Publisher<Record>> queryFunction) {
return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
dslContext()
.flatMap(queryFunction.andThen(Mono::from))
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()))));
}
public Mono<Optional<Record>> executeSingleRowOptional(Function<DSLContext, Publisher<Record>> queryFunction) {
return executeRow(queryFunction)
.map(Optional::ofNullable)
.switchIfEmpty(Mono.just(Optional.empty()));
}
public Mono<Integer> executeCount(Function<DSLContext, Mono<Record1<Integer>>> queryFunction) {
return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
dslContext()
.flatMap(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()))
.map(Record1::value1)));
}
public Mono<Boolean> executeExists(Function<DSLContext, SelectConditionStep<?>> queryFunction) {
return executeRow(dslContext -> Mono.from(dslContext.select(field(exists(queryFunction.apply(dslContext))))))
.map(record -> record.get(0, Boolean.class));
}
public Mono<Integer> executeReturnAffectedRowsCount(Function<DSLContext, Mono<Integer>> queryFunction) {
return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
dslContext()
.flatMap(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()))));
}
public Mono<Connection> connection() {
return connection;
}
@VisibleForTesting
public Mono<Void> dispose() {
return connection.flatMap(con -> Mono.from(con.close()));
}
private Predicate<Throwable> preparedStatementConflictException() {
return throwable -> throwable.getCause() instanceof R2dbcBadGrammarException
&& throwable.getMessage().contains("prepared statement")
&& throwable.getMessage().contains("already exists");
}
}