blob: 028b436346afe475a29d1a984967ee5a38615ba4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.sidecar.testing;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.util.Modules;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.distributed.shared.ShutdownException;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.InstancesConfig;
import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl;
import org.apache.cassandra.sidecar.common.CQLSessionProvider;
import org.apache.cassandra.sidecar.common.JmxClient;
import org.apache.cassandra.sidecar.common.dns.DnsResolver;
import org.apache.cassandra.sidecar.common.utils.DriverUtils;
import org.apache.cassandra.sidecar.common.utils.SidecarVersionProvider;
import org.apache.cassandra.sidecar.config.JmxConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import org.apache.cassandra.sidecar.server.MainModule;
import org.apache.cassandra.sidecar.server.Server;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
import org.apache.cassandra.testing.TestUtils;
import org.apache.cassandra.testing.TestVersion;
import org.apache.cassandra.testing.TestVersionSupplier;
import org.apache.cassandra.utils.Throwables;
import static org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext.tryGetIntConfig;
import static org.assertj.core.api.Assertions.assertThat;
/**
* This class provides an opinionated way to run integration tests. The {@link #setup()} method runs once at the
* beginning of all the tests in the implementation, as well as the {@link #tearDown()} method. The tests will share
* the same cluster throughout the lifetime of the tests, which means that implementers must be aware that any cluster
* alteration will have an impact on subsequent test runs, so it is recommended that tests run in isolated
* keyspaces/tables when required. Additionally, the state of the cluster should ideally remain the same for all
* tests, so ideally tests should not alter the state of the cluster in a way that would affect other tests.
*
* <p>The setup will run the following steps:
*
* <ol>
* <li>Find the first version from the {@link TestVersionSupplier#testVersions()}
* <li>Provision a cluster for the test using the version from the previous step (implementer must supply)
* <li>Initialize schemas required for the test (implementer must supply)
* <li>Start sidecar that talks to the provisioned cluster
* <li>(Optional) Run the before test start method (implementer can supply)
* </ol>
*
* <p>The above order guarantees that the cluster and Sidecar are both ready by the time the test
* setup completes. Removing the need to wait for schema propagation from the cluster to Sidecar,
* and removing the need to poll for schema changes to propagate. This helps in improving test
* time.
*
* <p>For the teardown of the test the steps are the following:
*
* <ol>
* <li>(Optional) Before sidecar stops (implementer can supply)
* <li>Stop sidecar
* <li>(Optional) Before cluster shutdowns (implementer can supply)
* <li>Close cluster
* <li>(Optional) Before tear down ends (implementer can supply)
* </ol>
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(VertxExtension.class)
public abstract class SharedClusterIntegrationTestBase
{
protected final Logger logger = LoggerFactory.getLogger(SharedClusterIntegrationTestBase.class);
private static final int MAX_CLUSTER_PROVISION_RETRIES = 5;
protected Vertx vertx;
protected DnsResolver dnsResolver;
protected AbstractCluster<? extends IInstance> cluster;
protected Server server;
protected Injector injector;
static
{
// Initialize defaults to configure the in-jvm dtest
TestUtils.configureDefaultDTestJarProperties();
}
@BeforeAll
protected void setup() throws InterruptedException, IOException
{
Optional<TestVersion> testVersion = TestVersionSupplier.testVersions().findFirst();
assertThat(testVersion).isPresent();
logger.info("Testing with version={}", testVersion);
cluster = provisionClusterWithRetries(testVersion.get());
assertThat(cluster).isNotNull();
initializeSchemaForTest();
startSidecar(cluster);
beforeTestStart();
}
protected AbstractCluster<? extends IInstance> provisionClusterWithRetries(TestVersion testVersion) throws IOException
{
for (int retry = 0; retry < MAX_CLUSTER_PROVISION_RETRIES; retry++)
{
try
{
return provisionCluster(testVersion);
}
catch (RuntimeException runtimeException)
{
boolean addressAlreadyInUse = Throwables.anyCauseMatches(runtimeException,
ex -> ex instanceof BindException &&
StringUtils.contains(ex.getMessage(), "Address already in use"));
if (addressAlreadyInUse)
{
logger.warn("Failed to provision cluster after {} retries", retry, runtimeException);
}
else
{
throw runtimeException;
}
}
}
throw new RuntimeException("Unable to provision cluster");
}
@AfterAll
protected void tearDown() throws InterruptedException
{
beforeSidecarStop();
stopSidecar();
beforeClusterShutdown();
closeCluster();
afterClusterShutdown();
}
/**
* @param testVersion the Cassandra version to use for the test
* @return a provisioned cluster to use for tests
* @throws IOException when provisioning a cluster fails
*/
protected abstract UpgradeableCluster provisionCluster(TestVersion testVersion) throws IOException;
/**
* Initialize required schemas for the tests upfront before the test starts
*/
protected abstract void initializeSchemaForTest();
/**
* Override to perform an action before the tests start
*/
protected void beforeTestStart()
{
}
/**
* Override to perform an action before Sidecar stops
*/
protected void beforeSidecarStop()
{
}
/**
* Override to perform an action before the cluster stops
*/
protected void beforeClusterShutdown()
{
}
/**
* Override to perform an action after the cluster has shutdown
*/
protected void afterClusterShutdown()
{
}
protected void createTestKeyspace(QualifiedName name, Map<String, Integer> rf)
{
createTestKeyspace(name.maybeQuotedKeyspace(), rf);
}
protected void createTestKeyspace(String keyspace, Map<String, Integer> rf)
{
cluster.schemaChangeIgnoringStoppedInstances("CREATE KEYSPACE IF NOT EXISTS " + keyspace
+ " WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', " +
generateRfString(rf) + " };");
}
protected void createTestTable(QualifiedName name, String createTableStatement)
{
cluster.schemaChangeIgnoringStoppedInstances(String.format(createTableStatement, name));
}
/**
* Starts Sidecar configured to run against the provided Cassandra {@code cluster}.
*
* @param cluster the cluster to use
* @throws InterruptedException when the startup times out
*/
protected void startSidecar(AbstractCluster<? extends IInstance> cluster) throws InterruptedException
{
VertxTestContext context = new VertxTestContext();
injector = Guice.createInjector(Modules.override(new MainModule()).with(new IntegrationTestModule(cluster)));
dnsResolver = injector.getInstance(DnsResolver.class);
vertx = injector.getInstance(Vertx.class);
server = injector.getInstance(Server.class);
server.start()
.onSuccess(s -> context.completeNow())
.onFailure(context::failNow);
context.awaitCompletion(5, TimeUnit.SECONDS);
}
/**
* Stops the Sidecar service
*
* @throws InterruptedException when stopping sidecar times out
*/
protected void stopSidecar() throws InterruptedException
{
if (server == null)
{
return;
}
CountDownLatch closeLatch = new CountDownLatch(1);
server.close().onSuccess(res -> closeLatch.countDown());
if (closeLatch.await(60, TimeUnit.SECONDS))
{
logger.info("Close event received before timeout.");
}
else
{
logger.error("Close event timed out.");
}
}
/**
* Closes the cluster and its resources
*/
protected void closeCluster()
{
if (cluster == null)
{
return;
}
logger.info("Closing cluster={}", cluster);
try
{
cluster.close();
}
// ShutdownException may be thrown from a different classloader, and therefore the standard
// `catch (ShutdownException)` won't always work - compare the canonical names instead.
catch (Throwable t)
{
if (Objects.equals(t.getClass().getCanonicalName(), ShutdownException.class.getCanonicalName()))
{
logger.debug("Encountered shutdown exception which closing the cluster", t);
}
else
{
throw t;
}
}
}
protected String generateRfString(Map<String, Integer> rf)
{
return rf.entrySet()
.stream()
.map(entry -> String.format("'%s':%d", entry.getKey(), entry.getValue()))
.collect(Collectors.joining(","));
}
/**
* Convenience method to query all data from the provided {@code table} at consistency level {@code LOCAL_QUORUM}.
*
* @param table the qualified Cassandra table name
* @return all the data queried from the table
*/
protected Object[][] queryAllData(QualifiedName table)
{
return queryAllData(table, ConsistencyLevel.LOCAL_QUORUM);
}
/**
* Convenience method to query all data from the provided {@code table} at the specified consistency level.
*
* @param table the qualified Cassandra table name
* @param consistencyLevel the consistency level to use for querying the data
* @return all the data queried from the table
*/
protected Object[][] queryAllData(QualifiedName table, ConsistencyLevel consistencyLevel)
{
return cluster.coordinator(1).execute(String.format("SELECT * FROM %s;", table), consistencyLevel);
}
static class IntegrationTestModule extends AbstractModule
{
private final AbstractCluster<? extends IInstance> cluster;
IntegrationTestModule(AbstractCluster<? extends IInstance> cluster)
{
this.cluster = cluster;
}
@Provides
@Singleton
public InstancesConfig instancesConfig(Vertx vertx,
SidecarConfiguration configuration,
CassandraVersionProvider cassandraVersionProvider,
SidecarVersionProvider sidecarVersionProvider,
DnsResolver dnsResolver)
{
JmxConfiguration jmxConfiguration = configuration.serviceConfiguration().jmxConfiguration();
List<InetSocketAddress> contactPoints = buildContactPoints();
CQLSessionProvider cqlSessionProvider = new TemporaryCqlSessionProvider(contactPoints,
SharedExecutorNettyOptions.INSTANCE);
List<InstanceMetadata> instanceMetadataList =
IntStream.range(0, cluster.size())
.mapToObj(i -> buildInstanceMetadata(vertx,
cluster.get(i + 1),
cassandraVersionProvider,
sidecarVersionProvider.sidecarVersion(),
jmxConfiguration,
cqlSessionProvider,
dnsResolver))
.collect(Collectors.toList());
return new InstancesConfigImpl(instanceMetadataList, dnsResolver);
}
@Provides
@Singleton
public SidecarConfiguration sidecarConfiguration()
{
ServiceConfiguration conf = ServiceConfigurationImpl.builder()
.host("0.0.0.0") // binds to all interfaces, potential security issue if left running for long
.port(0) // let the test find an available port
.build();
return SidecarConfigurationImpl.builder()
.serviceConfiguration(conf)
.build();
}
@Provides
@Singleton
public DnsResolver dnsResolver()
{
return new IntegrationTestBase.LocalhostResolver();
}
private List<InetSocketAddress> buildContactPoints()
{
return cluster.stream()
.map(instance -> new InetSocketAddress(instance.config().broadcastAddress().getAddress(),
tryGetIntConfig(instance.config(), "native_transport_port", 9042)))
.collect(Collectors.toList());
}
static InstanceMetadata buildInstanceMetadata(Vertx vertx,
IInstance cassandraInstance,
CassandraVersionProvider versionProvider,
String sidecarVersion,
JmxConfiguration jmxConfiguration,
CQLSessionProvider session,
DnsResolver dnsResolver)
{
IInstanceConfig config = cassandraInstance.config();
String ipAddress = JMXUtil.getJmxHost(config);
String hostName;
try
{
hostName = dnsResolver.reverseResolve(ipAddress);
}
catch (UnknownHostException e)
{
hostName = ipAddress;
}
int port = tryGetIntConfig(config, "native_transport_port", 9042);
String[] dataDirectories = (String[]) config.get("data_file_directories");
String stagingDir = stagingDir(dataDirectories);
JmxClient jmxClient = JmxClient.builder()
.host(ipAddress)
.port(config.jmxPort())
.connectionMaxRetries(jmxConfiguration.maxRetries())
.connectionRetryDelayMillis(jmxConfiguration.retryDelayMillis())
.build();
CassandraAdapterDelegate delegate = new CassandraAdapterDelegate(vertx,
config.num(),
versionProvider,
session,
jmxClient,
new DriverUtils(),
sidecarVersion,
ipAddress,
port);
return InstanceMetadataImpl.builder()
.id(config.num())
.host(hostName)
.port(port)
.dataDirs(Arrays.asList(dataDirectories))
.stagingDir(stagingDir)
.delegate(delegate)
.globalMetricRegistryName("test")
.build();
}
private static String stagingDir(String[] dataDirectories)
{
// Use the parent of the first data directory as the staging directory
Path dataDirParentPath = Paths.get(dataDirectories[0]).getParent();
// If the cluster has not started yet, the node's root directory doesn't exist yet
assertThat(dataDirParentPath).isNotNull();
Path stagingPath = dataDirParentPath.resolve("staging");
return stagingPath.toFile().getAbsolutePath();
}
}
}