blob: a20ca22f6a9d1de3c54199887097b4f92c538812 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.utils;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.sidecar.Configuration;
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.TableOperations;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(VertxExtension.class)
class SSTableImporterTest
{
private Vertx vertx;
private InstanceMetadataFetcher mockMetadataFetcher;
private Configuration mockConfiguration;
private TableOperations mockTableOperations1;
private ExecutorPools executorPools;
private SSTableUploadsPathBuilder mockUploadPathBuilder;
private SSTableImporter importer;
@BeforeEach
public void setup() throws InterruptedException
{
vertx = Vertx.vertx();
mockMetadataFetcher = mock(InstanceMetadataFetcher.class);
mockConfiguration = mock(Configuration.class);
CassandraAdapterDelegate mockCassandraAdapterDelegate1 = mock(CassandraAdapterDelegate.class);
CassandraAdapterDelegate mockCassandraAdapterDelegate2 = mock(CassandraAdapterDelegate.class);
CassandraAdapterDelegate mockCassandraAdapterDelegate3 = mock(CassandraAdapterDelegate.class);
mockTableOperations1 = mock(TableOperations.class);
TableOperations mockTableOperations2 = mock(TableOperations.class);
when(mockConfiguration.getSSTableImportPollIntervalMillis()).thenReturn(10);
when(mockMetadataFetcher.delegate("localhost")).thenReturn(mockCassandraAdapterDelegate1);
when(mockMetadataFetcher.delegate("127.0.0.2")).thenReturn(mockCassandraAdapterDelegate2);
when(mockMetadataFetcher.delegate("127.0.0.3")).thenReturn(mockCassandraAdapterDelegate3);
when(mockCassandraAdapterDelegate1.tableOperations()).thenReturn(mockTableOperations1);
when(mockTableOperations1.importNewSSTables("ks", "tbl", "/dir", true, true,
true, true, true, true, false))
.thenReturn(Collections.emptyList());
when(mockTableOperations1.importNewSSTables("ks", "tbl", "/failed-dir", true, true,
true, true, true, true, false))
.thenReturn(Collections.singletonList("/failed-dir"));
when(mockCassandraAdapterDelegate2.tableOperations()).thenReturn(mockTableOperations2);
when(mockTableOperations2.importNewSSTables("ks", "tbl", "/dir", true, true,
true, true, true, true, false))
.thenThrow(new RuntimeException("Exception during import"));
WorkerPoolConfiguration workerPoolConf = new WorkerPoolConfiguration("test-pool", 10,
TimeUnit.SECONDS.toMillis(30));
when(mockConfiguration.serverWorkerPoolConfiguration()).thenReturn(workerPoolConf);
when(mockConfiguration.serverInternalWorkerPoolConfiguration()).thenReturn(workerPoolConf);
executorPools = new ExecutorPools(vertx, mockConfiguration);
mockUploadPathBuilder = mock(SSTableUploadsPathBuilder.class);
// since we are not actually creating any files for the test, we need to handle cleanup such that we don't
// get NullPointerExceptions because the mock is not wired up, and we need to prevent vertx from actually
// doing a vertx.filesystem().deleteRecursive(). So we return a failed future with a fake path when checking
// if the directory exists.
when(mockUploadPathBuilder.resolveUploadIdDirectory(anyString(), anyString()))
.thenReturn(Future.failedFuture("fake-path"));
when(mockUploadPathBuilder.isValidDirectory("fake-path")).thenReturn(Future.failedFuture("skip cleanup"));
importer = new SSTableImporter(vertx, mockMetadataFetcher, mockConfiguration, executorPools,
mockUploadPathBuilder);
}
@Test
void testImportSucceeds(VertxTestContext context)
{
Future<Void> importFuture = importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
.host("localhost")
.keyspace("ks")
.tableName("tbl")
.directory("/dir")
.uploadId("0000-0000")
.build());
importFuture.onComplete(context.succeeding(v -> {
assertThat(importer.importQueuePerHost).isNotEmpty();
assertThat(importer.importQueuePerHost).containsKey("localhost$ks$tbl");
for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
{
assertThat(queue).isEmpty();
}
verify(mockTableOperations1, times(1))
.importNewSSTables("ks", "tbl", "/dir", true, true, true, true, true, true, false);
context.completeNow();
}));
}
@Test
void testImportFailsWhenCassandraIsUnavailable(VertxTestContext context)
{
Future<Void> importFuture = importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
.host("127.0.0.3")
.keyspace("ks")
.tableName("tbl")
.directory("/dir3")
.uploadId("0000-0000")
.build());
importFuture.onComplete(context.failing(p -> {
assertThat(p).isInstanceOf(HttpException.class);
HttpException exception = (HttpException) p;
assertThat(exception.getStatusCode()).isEqualTo(HttpResponseStatus.SERVICE_UNAVAILABLE.code());
assertThat(exception.getPayload()).isEqualTo("Cassandra service is unavailable");
assertThat(importer.importQueuePerHost).isNotEmpty();
assertThat(importer.importQueuePerHost).containsKey("127.0.0.3$ks$tbl");
for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
{
assertThat(queue).isEmpty();
}
context.completeNow();
}));
}
@Test
void testImportFailsWhenImportReturnsFailedDirectories(VertxTestContext context)
{
Future<Void> importFuture = importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
.host("localhost")
.keyspace("ks")
.tableName("tbl")
.directory("/failed-dir")
.uploadId("0000-0000")
.build());
importFuture.onComplete(context.failing(p -> {
assertThat(p).isInstanceOf(HttpException.class);
HttpException exception = (HttpException) p;
assertThat(exception.getStatusCode()).isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
assertThat(exception.getPayload()).isEqualTo("Failed to import from directories: [/failed-dir]");
assertThat(importer.importQueuePerHost).isNotEmpty();
assertThat(importer.importQueuePerHost).containsKey("localhost$ks$tbl");
for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
{
assertThat(queue).isEmpty();
}
context.completeNow();
}));
}
@Test
void testImportFailsWhenImportThrowsIOException(VertxTestContext context)
{
Future<Void> importFuture = importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
.host("127.0.0.2")
.keyspace("ks")
.tableName("tbl")
.directory("/dir")
.uploadId("0000-0000")
.build());
importFuture.onComplete(context.failing(p -> {
assertThat(p).isInstanceOf(RuntimeException.class);
RuntimeException exception = (RuntimeException) p;
assertThat(exception.getMessage()).isEqualTo("Exception during import");
assertThat(importer.importQueuePerHost).isNotEmpty();
assertThat(importer.importQueuePerHost).containsKey("127.0.0.2$ks$tbl");
for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
{
assertThat(queue).isEmpty();
}
context.completeNow();
}));
}
@Test
void testCancelImportSucceeds(VertxTestContext context)
{
when(mockConfiguration.getSSTableImportPollIntervalMillis()).thenReturn(500);
SSTableImporter importer = new SSTableImporter(vertx, mockMetadataFetcher, mockConfiguration, executorPools,
mockUploadPathBuilder);
SSTableImporter.ImportOptions options = new SSTableImporter.ImportOptions.Builder()
.host("localhost")
.keyspace("ks")
.tableName("tbl")
.directory("/dir")
.uploadId("0000-0000")
.build();
importer.scheduleImport(options);
assertThat(importer.cancelImport(options)).isTrue();
context.completeNow();
}
@Test
void testCancelImportNoOpAfterProcessing(VertxTestContext context)
{
SSTableImporter.ImportOptions options = new SSTableImporter.ImportOptions.Builder()
.host("localhost")
.keyspace("ks")
.tableName("tbl")
.directory("/dir")
.uploadId("0000-0000")
.build();
Future<Void> importFuture = importer.scheduleImport(options);
importFuture.onComplete(context.succeeding(v -> {
assertThat(importer.cancelImport(options)).isFalse();
context.completeNow();
}));
}
}