blob: 628c184e3f184ee55e73613b366bf25e4e023bda [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.routes.sstableuploads;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.junit.jupiter.api.extension.ExtendWith;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.sidecar.IntegrationTestBase;
import org.apache.cassandra.sidecar.common.SimpleCassandraVersion;
import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
import org.apache.cassandra.testing.CassandraIntegrationTest;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
/**
* Integration tests for {@link SSTableImportHandler}
*/
@ExtendWith(VertxExtension.class)
public class SSTableImportHandlerIntegrationTest extends IntegrationTestBase
{
public static final SimpleCassandraVersion MIN_VERSION_WITH_IMPORT = SimpleCassandraVersion.create("4.0.0");
@CassandraIntegrationTest
void testSSTableImport(VertxTestContext vertxTestContext)
throws IOException, InterruptedException
{
// Cassandra before 4.0 does not have the necessary JMX endpoints,
// so we skip if the cluster version is below 4.0
assumeThat(sidecarTestContext.version)
.withFailMessage("Import is only available in Cassandra 4.0 and later.")
.isGreaterThanOrEqualTo(MIN_VERSION_WITH_IMPORT);
// create a table. Insert some data, create a snapshot that we'll use for import.
// Truncate the table, insert more data.
// Test the import SSTable endpoint by importing data that was originally truncated.
// Verify by querying the table contains all the results before truncation and after truncation.
Session session = maybeGetSession();
createTestKeyspace();
QualifiedTableName tableName = createTestTableAndPopulate(sidecarTestContext, Arrays.asList("a", "b"));
// create a snapshot called <tableName>-snapshot for tbl1
UpgradeableCluster cluster = sidecarTestContext.cluster();
final String snapshotStdout = cluster.get(1).nodetoolResult("snapshot",
"--tag", tableName.tableName() + "-snapshot",
"--table", tableName.tableName(),
"--", tableName.keyspace()).getStdout();
assertThat(snapshotStdout).contains("Snapshot directory: " + tableName.tableName() + "-snapshot");
// find the directory in the filesystem
final List<Path> snapshotFiles = findChildFile(sidecarTestContext,
"127.0.0.1", tableName.tableName() + "-snapshot");
assertThat(snapshotFiles).isNotEmpty();
// copy the snapshot to the expected staging directory
final UUID uploadId = UUID.randomUUID();
// for the test, we need to have the same directory structure for the local data directory replicated
// inside the cluster. The way the endpoint works is by verifying that the directory exists, this
// verification happens in the host system. When calling import we use the same directory, but the
// directory does not exist inside the cluster. For that reason we need to do the following to
// ensure "import" finds the path inside the cluster
String uploadStagingDir = sidecarTestContext.instancesConfig()
.instanceFromHost("127.0.0.1").stagingDir();
final String stagingPathInContainer = uploadStagingDir + File.separator + uploadId
+ File.separator + tableName.keyspace()
+ File.separator + tableName.tableName();
boolean mkdirs = new File(stagingPathInContainer).mkdirs();
assertThat(mkdirs)
.withFailMessage("Could not create directory " + uploadStagingDir)
.isTrue();
// copy snapshot files into the staging path in the cluster
for (Path path : snapshotFiles)
{
if (path.toFile().isFile())
{
Files.copy(path, Paths.get(stagingPathInContainer).resolve(path.getFileName()));
}
}
// Now truncate the contents of the table
truncateAndVerify(tableName);
// Add new data (c, d) to table
populateTable(session, tableName, Arrays.asList("c", "d"));
WebClient client = WebClient.create(vertx);
String testRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" + tableName.keyspace()
+ "/tables/" + tableName.tableName() + "/import";
sendRequest(vertxTestContext,
() -> client.put(server.actualPort(), "127.0.0.1", testRoute),
vertxTestContext.succeeding(response -> vertxTestContext.verify(() -> {
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
assertThat(queryValues(tableName))
.containsAll(Arrays.asList("a", "b", "c", "d"));
vertxTestContext.completeNow();
})));
// wait until test completes
assertThat(vertxTestContext.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
}
private void sendRequest(VertxTestContext vertxTestContext, Supplier<HttpRequest<Buffer>> requestSupplier,
Handler<AsyncResult<HttpResponse<Buffer>>> handler)
{
requestSupplier.get()
.send(vertxTestContext.succeeding(r -> vertxTestContext.verify(() -> {
int statusCode = r.statusCode();
if (statusCode == HttpResponseStatus.ACCEPTED.code())
{
// retry the request every second when the request is accepted
vertx.setTimer(1000, tid -> sendRequest(vertxTestContext, requestSupplier, handler));
}
else
{
handler.handle(Future.succeededFuture(r));
}
})));
}
private void truncateAndVerify(QualifiedTableName qualifiedTableName)
throws InterruptedException
{
Session session = maybeGetSession();
session.execute("TRUNCATE TABLE " + qualifiedTableName);
while (true)
{
TimeUnit.MILLISECONDS.sleep(100);
ResultSet rs = session.execute("SELECT * FROM " + qualifiedTableName);
if (rs.all().size() == 0)
break; // truncate succeeded
}
}
private List<String> queryValues(QualifiedTableName tableName)
{
Session session = maybeGetSession();
return session.execute("SELECT id FROM " + tableName)
.all()
.stream()
.map(row -> row.getString("id"))
.collect(Collectors.toList());
}
private QualifiedTableName createTestTableAndPopulate(CassandraSidecarTestContext cassandraTestContext,
List<String> values)
{
QualifiedTableName tableName = createTestTable(
"CREATE TABLE IF NOT EXISTS %s (id text, PRIMARY KEY(id));");
Session session = maybeGetSession();
populateTable(session, tableName, values);
return tableName;
}
private void populateTable(Session session, QualifiedTableName tableName, List<String> values)
{
for (String value : values)
{
session.execute(String.format("INSERT INTO %s (id) VALUES ('%s');", tableName, value));
}
}
}