blob: 4d33d78341b12ba1ab80eaf577af448804d21dba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.testutils.databases.postgres;
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.apache.flink.util.FlinkRuntimeException;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;
import static org.apache.flink.util.Preconditions.checkArgument;
/** A Postgres database for testing. */
public class PostgresDatabase extends DatabaseExtension implements PostgresImages {
private static final PostgreSQLContainer<?> CONTAINER =
new PostgresXaContainer(POSTGRES_15).withMaxConnections(10).withMaxTransactions(50);
private static PostgresMetadata metadata;
public static PostgresMetadata getMetadata() {
if (!CONTAINER.isRunning()) {
throw new FlinkRuntimeException("Container is stopped.");
}
if (metadata == null) {
metadata = new PostgresMetadata(CONTAINER, true);
}
return metadata;
}
@Override
protected DatabaseMetadata startDatabase() throws Exception {
CONTAINER.start();
return getMetadata();
}
@Override
protected void stopDatabase() throws Exception {
CONTAINER.stop();
metadata = null;
}
/** {@link PostgreSQLContainer} with XA enabled (by setting max_prepared_transactions). */
public static class PostgresXaContainer extends PostgreSQLContainer<PostgresXaContainer> {
private static final int SUPERUSER_RESERVED_CONNECTIONS = 1;
private int maxConnections = SUPERUSER_RESERVED_CONNECTIONS + 1;
private int maxTransactions = 1;
public PostgresXaContainer(String dockerImageName) {
super(DockerImageName.parse(dockerImageName));
}
public PostgresXaContainer withMaxConnections(int maxConnections) {
checkArgument(
maxConnections > SUPERUSER_RESERVED_CONNECTIONS,
"maxConnections should be greater than superuser_reserved_connections");
this.maxConnections = maxConnections;
return this.self();
}
public PostgresXaContainer withMaxTransactions(int maxTransactions) {
checkArgument(maxTransactions > 1, "maxTransactions should be greater 1");
this.maxTransactions = maxTransactions;
return this.self();
}
@Override
public void start() {
setCommand(
"postgres",
"-c",
"superuser_reserved_connections=" + SUPERUSER_RESERVED_CONNECTIONS,
"-c",
"max_connections=" + maxConnections,
"-c",
"max_prepared_transactions=" + maxTransactions,
"-c",
"fsync=off");
super.start();
}
}
}