blob: 59604a2370d0c054c9926aef519301cbd71d1c74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.cassandra;
import org.apache.flink.connector.cassandra.source.utils.QueryValidator;
import org.apache.flink.connector.testframe.TestResource;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.MountableFile;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
/**
* Junit test environment that contains everything needed at the test suite level: testContainer
* setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup).
*/
@Testcontainers
public class CassandraTestEnvironment implements TestResource {
private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class);
private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.1.9";
private static final int CQL_PORT = 9042;
private static final int READ_TIMEOUT_MILLIS = 36000;
public static final String KEYSPACE = "flink";
private static final String CREATE_KEYSPACE_QUERY =
"CREATE KEYSPACE "
+ KEYSPACE
+ " WITH replication= {'class':'SimpleStrategy', 'replication_factor':2};";
public static final String SPLITS_TABLE = "flinksplits";
/*
CREATE TABLE flink.flinksplits (col1 int, col2 int, col3 int, col4 int, PRIMARY KEY ((col1, col2), col3));
- partition key is (col1, col2)
- primary key is ((col1, col2), col3) so col3 is a clustering column
- col4 is a regular column
*/
private static final String CREATE_SPLITS_TABLE_QUERY =
"CREATE TABLE "
+ KEYSPACE
+ "."
+ SPLITS_TABLE
+ " (col1 int, col2 int, col3 int, col4 int, PRIMARY KEY ((col1, col2), col3));";
private static final String CREATE_INDEX =
"CREATE INDEX col4index ON " + KEYSPACE + "." + SPLITS_TABLE + " (col4);";
private static final String INSERT_INTO_FLINK_SPLITS =
"INSERT INTO "
+ KEYSPACE
+ "."
+ SPLITS_TABLE
+ " (col1, col2, col3, col4)"
+ " VALUES (%d, %d, %d, %d)";
private static final int NB_SPLITS_RECORDS = 1000;
private static final int STARTUP_TIMEOUT_MINUTES = 3;
@Container private final CassandraContainer cassandraContainer1;
@Container private final CassandraContainer cassandraContainer2;
boolean insertTestDataForSplitSizeTests;
private Cluster cluster;
private Session session;
private ClusterBuilder builderForReading;
private ClusterBuilder builderForWriting;
private QueryValidator queryValidator;
public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) {
this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests;
Network network = Network.newNetwork();
cassandraContainer1 =
(CassandraContainer)
new CassandraContainer(DOCKER_CASSANDRA_IMAGE)
.withNetwork(network)
.withEnv("CASSANDRA_CLUSTER_NAME", "testcontainers")
.withEnv("CASSANDRA_SEEDS", "cassandra")
.withEnv("JVM_OPTS", "")
.withNetworkAliases("cassandra")
.withCopyFileToContainer(
MountableFile.forClasspathResource("cassandra.yaml"),
"/etc/cassandra/cassandra.yaml" // for timeouts
);
cassandraContainer2 =
(CassandraContainer)
new CassandraContainer(DOCKER_CASSANDRA_IMAGE)
.withNetwork(network)
.withEnv("CASSANDRA_CLUSTER_NAME", "testcontainers")
.withEnv("JVM_OPTS", "")
.withEnv("CASSANDRA_SEEDS", "cassandra")
.withCopyFileToContainer(
MountableFile.forClasspathResource("cassandra.yaml"),
"/etc/cassandra/cassandra.yaml" // for timeouts
);
}
@Override
public void startUp() throws Exception {
startEnv();
}
@Override
public void tearDown() throws Exception {
stopEnv();
}
private void startEnv() throws Exception {
// configure container start to wait until cassandra is ready to receive queries
// start with retrials
cassandraContainer1.waitingFor(
new CassandraQueryWaitStrategy()
.withStartupTimeout(Duration.ofMinutes(STARTUP_TIMEOUT_MINUTES)));
cassandraContainer2.waitingFor(
new CassandraQueryWaitStrategy()
.withStartupTimeout(Duration.ofMinutes(STARTUP_TIMEOUT_MINUTES)));
cassandraContainer1.start();
cassandraContainer1.followOutput(
new Slf4jLogConsumer(LOG),
OutputFrame.OutputType.END,
OutputFrame.OutputType.STDERR,
OutputFrame.OutputType.STDOUT);
cassandraContainer2.start();
cassandraContainer2.followOutput(
new Slf4jLogConsumer(LOG),
OutputFrame.OutputType.END,
OutputFrame.OutputType.STDERR,
OutputFrame.OutputType.STDOUT);
cluster = cassandraContainer1.getCluster();
// ConsistencyLevel.ONE is the minimum level for reading
builderForReading =
createBuilderWithConsistencyLevel(
ConsistencyLevel.ONE,
cassandraContainer1.getHost(),
cassandraContainer1.getMappedPort(CQL_PORT));
queryValidator = new QueryValidator(builderForReading);
builderForWriting =
createBuilderWithConsistencyLevel(
ConsistencyLevel.ONE,
cassandraContainer1.getHost(),
cassandraContainer1.getMappedPort(CQL_PORT));
session = cluster.connect();
executeRequestWithTimeout(CREATE_KEYSPACE_QUERY);
// create a dedicated table for split size tests
if (insertTestDataForSplitSizeTests) {
insertTestDataForSplitSizeTests();
}
}
private void insertTestDataForSplitSizeTests() throws Exception {
executeRequestWithTimeout(CREATE_SPLITS_TABLE_QUERY);
executeRequestWithTimeout(CREATE_INDEX);
for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
executeRequestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, i, i, i));
}
refreshSizeEstimates(SPLITS_TABLE);
}
private void stopEnv() {
if (session != null) {
session.close();
}
if (cluster != null) {
cluster.close();
}
try {
cassandraContainer1.stop();
} catch (Exception e) {
// do not fail the test for a stop failure and allow the other container to stop
LOG.error("Cassandra test container 1 failed to stop.", e);
}
try {
cassandraContainer2.stop();
} catch (Exception e) {
// do not fail the test for a stop failure
LOG.error("Cassandra test container 2 failed to stop.", e);
}
}
private ClusterBuilder createBuilderWithConsistencyLevel(
ConsistencyLevel consistencyLevel, String host, int port) {
return new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPointsWithPorts(new InetSocketAddress(host, port))
.withQueryOptions(
new QueryOptions()
.setConsistencyLevel(consistencyLevel)
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
.withSocketOptions(
new SocketOptions()
// default timeout x 3
.setConnectTimeoutMillis(15000)
// default timeout x3 and higher than
// request_timeout_in_ms at the cluster level
.setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
.build();
}
};
}
/**
* Force the refresh of system.size_estimates table. It is needed for the tests because we just
* inserted records. It is done on a single node as the size estimation for split generation is
* evaluated based on the ring fraction the connect node represents in the cluster. We first
* flush the MemTables to SSTables because the size estimates are only on SSTables. Then we
* refresh the size estimates.
*/
void refreshSizeEstimates(String table) throws Exception {
final ExecResult execResult1 =
cassandraContainer1.execInContainer("nodetool", "flush", KEYSPACE, table);
final ExecResult execResult2 =
cassandraContainer1.execInContainer("nodetool", "refreshsizeestimates");
if (execResult1.getExitCode() != 0 || execResult2.getExitCode() != 0) {
throw new RuntimeException(
"Failed to refresh system.size_estimates on the Cassandra cluster");
}
List<Row> partitions = new ArrayList<>();
while (partitions.isEmpty()
|| partitions.stream().anyMatch(row -> row.getLong("mean_partition_size") == 0L)) {
Thread.sleep(1000);
partitions =
session.execute(
"SELECT range_start, range_end, partitions_count, mean_partition_size FROM "
+ "system.size_estimates WHERE keyspace_name = ? AND table_name = ?",
KEYSPACE,
table)
.all();
}
}
public ResultSet executeRequestWithTimeout(String query) {
return session.execute(
new SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS));
}
public ClusterBuilder getBuilderForReading() {
return builderForReading;
}
public ClusterBuilder getBuilderForWriting() {
return builderForWriting;
}
public QueryValidator getQueryValidator() {
return queryValidator;
}
public Session getSession() {
return session;
}
public String getContactPoint() {
return cassandraContainer.getHost();
}
public int getPort() {
return cassandraContainer.getMappedPort(CQL_PORT);
}
public String getUsername() {
return cassandraContainer.getUsername();
}
public String getPassword() {
return cassandraContainer.getPassword();
}
}