blob: 9598ad2099caaa7205130cd976c8a36c055d9c05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.restore;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.util.Modules;
import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.TestModule;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools.TaskExecutorPool;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreJobTest;
import org.apache.cassandra.sidecar.db.RestoreSlice;
import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.server.MainModule;
import org.apache.cassandra.sidecar.stats.RestoreJobStats;
import org.apache.cassandra.sidecar.stats.TestRestoreJobStats;
import org.apache.cassandra.sidecar.utils.SSTableImporter;
import org.mockito.Mockito;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import static org.apache.cassandra.sidecar.AssertionUtils.getBlocking;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class RestoreSliceTaskTest
{
private RestoreSlice restoreSlice;
private StorageClient storageClient;
private TaskExecutorPool executorPool;
private SSTableImporter importer;
private TestRestoreJobStats stats;
private RestoreSliceTask task;
private RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
@BeforeEach
void setup()
{
restoreSlice = mock(RestoreSlice.class, Mockito.RETURNS_DEEP_STUBS);
when(restoreSlice.stageDirectory()).thenReturn(Paths.get("."));
when(restoreSlice.sliceId()).thenReturn("testing-slice");
when(restoreSlice.key()).thenReturn("storage-key");
when(restoreSlice.owner().id()).thenReturn(1);
storageClient = mock(StorageClient.class);
importer = mock(SSTableImporter.class);
Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(new TestModule()));
executorPool = injector.getInstance(ExecutorPools.class).internal();
stats = new TestRestoreJobStats();
sliceDatabaseAccessor = mock(RestoreSliceDatabaseAccessor.class);
task = new TestRestoreSliceTask(restoreSlice, storageClient,
executorPool, importer, 0,
sliceDatabaseAccessor, stats);
}
@Test
void testRestoreSucceeds()
{
when(storageClient.objectExists(restoreSlice)).thenReturn(CompletableFuture.completedFuture(null));
when(storageClient.downloadObjectIfAbsent(restoreSlice))
.thenReturn(CompletableFuture.completedFuture(new File(".")));
Promise<RestoreSlice> promise = Promise.promise();
task.handle(promise);
getBlocking(promise.future()); // no error is thrown
// assert on the stats collected
assertThat(stats.sliceReplicationTimes).hasSize(1);
assertThat(stats.sliceReplicationTimes.get(0)).isPositive();
assertThat(stats.sliceDownloadTimes).hasSize(1);
assertThat(stats.sliceDownloadTimes.get(0)).isPositive();
assertThat(stats.sliceUnzipTimes).hasSize(1);
assertThat(stats.sliceUnzipTimes.get(0)).isPositive();
assertThat(stats.sliceValidationTimes).hasSize(1);
assertThat(stats.sliceValidationTimes.get(0)).isPositive();
assertThat(stats.sliceImportTimes).hasSize(1);
assertThat(stats.sliceImportTimes.get(0)).isPositive();
}
@Test
void testCaptureSliceReplicationTimeOnlyOnce()
{
// the existence of the slice is already confirmed by the s3 client
when(restoreSlice.existsOnS3()).thenReturn(true);
when(storageClient.downloadObjectIfAbsent(restoreSlice))
.thenReturn(CompletableFuture.completedFuture(new File(".")));
Promise<RestoreSlice> promise = Promise.promise();
task.handle(promise);
getBlocking(promise.future()); // no error is thrown
assertThat(stats.sliceReplicationTimes)
.describedAs("The replication time of the slice has been captured when confirming the existence." +
"It should not be captured again in this run.")
.isEmpty();
}
@Test
void testStopProcessingCancelledSlice()
{
when(restoreSlice.isCancelled()).thenReturn(true);
Promise<RestoreSlice> promise = Promise.promise();
task.handle(promise);
assertThatThrownBy(() -> getBlocking(promise.future()))
.hasRootCauseExactlyInstanceOf(RestoreJobFatalException.class)
.hasMessageContaining("Restore slice is cancelled");
}
@Test
void testThrowRetryableExceptionOnS3ObjectNotFound()
{
CompletableFuture<HeadObjectResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(mock(NoSuchKeyException.class));
when(storageClient.objectExists(restoreSlice)).thenReturn(failedFuture);
Promise<RestoreSlice> promise = Promise.promise();
task.handle(promise);
assertThatThrownBy(() -> getBlocking(promise.future()))
.hasRootCauseExactlyInstanceOf(RestoreJobException.class) // NOT a fatal exception
.hasMessageContaining("Object not found");
}
@Test
void testSliceStaging()
{
// test specific setup
RestoreJob job = spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED));
doReturn(true).when(job).isManagedBySidecar();
when(restoreSlice.job()).thenReturn(job);
when(restoreSlice.stagedObjectPath()).thenReturn(Paths.get("nonexist"));
when(storageClient.objectExists(restoreSlice)).thenReturn(CompletableFuture.completedFuture(null));
when(storageClient.downloadObjectIfAbsent(restoreSlice))
.thenReturn(CompletableFuture.completedFuture(new File(".")));
Promise<RestoreSlice> promise = Promise.promise();
task.handle(promise);
getBlocking(promise.future()); // no error is thrown
verify(restoreSlice, times(1)).completeStagePhase();
verify(restoreSlice, times(0)).completeImportPhase(); // should not be called in this phase
verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice);
}
@Test
void testSliceStagingWithExistingObject(@TempDir Path testFolder) throws IOException
{
// test specific setup
RestoreJob job = spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED));
doReturn(true).when(job).isManagedBySidecar();
when(restoreSlice.job()).thenReturn(job);
Path stagedPath = testFolder.resolve("slice.zip");
Files.createFile(stagedPath);
when(restoreSlice.stagedObjectPath()).thenReturn(stagedPath);
when(storageClient.objectExists(restoreSlice))
.thenThrow(new RuntimeException("Should not call this method"));
when(storageClient.downloadObjectIfAbsent(restoreSlice))
.thenThrow(new RuntimeException("Should not call this method"));
Promise<RestoreSlice> promise = Promise.promise();
task.handle(promise);
getBlocking(promise.future()); // no error is thrown
verify(restoreSlice, times(1)).completeStagePhase();
verify(restoreSlice, times(0)).completeImportPhase(); // should not be called in this phase
verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice);
}
@Test
void testSliceImport()
{
// test specific setup
RestoreJob job = spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED));
doReturn(true).when(job).isManagedBySidecar();
when(restoreSlice.job()).thenReturn(job);
Promise<RestoreSlice> promise = Promise.promise();
task.handle(promise);
getBlocking(promise.future()); // no error is thrown
verify(restoreSlice, times(0)).completeStagePhase(); // should not be called in the phase
verify(restoreSlice, times(1)).completeImportPhase();
verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice);
}
static class TestRestoreSliceTask extends RestoreSliceTask
{
private final RestoreSlice slice;
private final RestoreJobStats stats;
public TestRestoreSliceTask(RestoreSlice slice, StorageClient s3Client, TaskExecutorPool executorPool,
SSTableImporter importer, double requiredUsableSpacePercentage,
RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats)
{
super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats);
this.slice = slice;
this.stats = stats;
}
@Override
void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
{
stats.captureSliceUnzipTime(1, 123L);
stats.captureSliceValidationTime(1, 123L);
stats.captureSliceImportTime(1, 123L);
slice.completeImportPhase();
event.tryComplete(slice);
if (onSuccessCommit != null)
{
onSuccessCommit.run();
}
}
@Override
void unzipAndImport(Promise<RestoreSlice> event, File file)
{
unzipAndImport(event, file, null);
}
}
}