CASSANDRASC-105 RestoreSliceTask could be stuck due to missing exception handling (#107)
patch by Saranya Krishnakumar; reviewed by Francisco Guerrero, Yifan Cai for CASSANDRASC-105
diff --git a/CHANGES.txt b/CHANGES.txt
index 0fdc914..e625cde 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * RestoreSliceTask could be stuck due to missing exception handling (CASSANDRASC-105)
* Make hash algorithm implementation pluggable (CASSANDRASC-114)
* Fix ClosedChannelException when downloading from S3 (CASSANDRASC-112)
* Fix NPE thrown when getting StorageClient from pool (CASSANDRASC-110)
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index ecd0734..4e942c7 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -107,7 +107,7 @@
slice.compressedSize() + slice.uncompressedSize(),
requiredUsableSpacePercentage,
executorPool)
- .onSuccess(ignored -> {
+ .compose(v -> {
RestoreJob job = slice.job();
if (job.isManagedBySidecar())
{
@@ -120,55 +120,63 @@
slice.completeStagePhase(); // update the flag if missed
sliceDatabaseAccessor.updateStatus(slice);
event.tryComplete(slice);
- return;
+ return Future.succeededFuture();
}
// 1. check object existence and validate eTag / checksum
- checkObjectExistence(event)
+ CompletableFuture<Void> fut = checkObjectExistence(event)
// 2. download slice/object when the remote object exists
.thenCompose(headObject -> downloadSlice(event))
// 3. persist status
- .thenAccept(x -> {
+ .thenAccept(file -> {
slice.completeStagePhase();
sliceDatabaseAccessor.updateStatus(slice);
// completed staging. A new task is produced when it comes to import
event.tryComplete(slice);
+ })
+ .whenComplete((x, cause) -> {
+ if (cause != null)
+ {
+ // handle unexpected errors thrown during download slice call, that do not close event
+ event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause));
+ }
});
+
+ return Future.fromCompletionStage(fut);
}
else if (job.status == RestoreJobStatus.STAGED)
{
- unzipAndImport(event, slice.stagedObjectPath().toFile(),
- // persist status
- () -> sliceDatabaseAccessor.updateStatus(slice));
+ return unzipAndImport(event, slice.stagedObjectPath().toFile(),
+ // persist status
+ () -> sliceDatabaseAccessor.updateStatus(slice));
}
else
{
String msg = "Unexpected restore job status. Expected only CREATED or STAGED when " +
"processing active slices. Found status: " + job.status;
Exception unexpectedState = new IllegalStateException(msg);
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status",
- slice, unexpectedState));
+ return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status",
+ slice, unexpectedState));
}
}
else
{
- downloadSliceAndImport(event);
+ return downloadSliceAndImport(event);
}
})
- .onFailure(cause -> {
- String msg = "Unable to ensure enough space for the slice. Retry later";
- event.tryFail(RestoreJobExceptions.ofSlice(msg, slice, cause));
- });
+ .onSuccess(v -> event.tryComplete(slice))
+ .onFailure(cause -> event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause)));
}
- private void downloadSliceAndImport(Promise<RestoreSlice> event)
+ private Future<Void> downloadSliceAndImport(Promise<RestoreSlice> event)
{
// 1. check object existence and validate eTag / checksum
- checkObjectExistence(event)
+ CompletableFuture<File> fut = checkObjectExistence(event)
// 2. download slice/object when the remote object exists
- .thenCompose(headObject -> downloadSlice(event))
+ .thenCompose(headObject -> downloadSlice(event));
// 3. unzip the file and import/commit
- .thenAccept(file -> unzipAndImport(event, file));
+ return Future.fromCompletionStage(fut)
+ .compose(file -> unzipAndImport(event, file));
}
private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice> event)
@@ -281,21 +289,21 @@
}
@VisibleForTesting
- void unzipAndImport(Promise<RestoreSlice> event, File file)
+ Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file)
{
- unzipAndImport(event, file, null);
+ return unzipAndImport(event, file, null);
}
- void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
+ Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
{
if (file == null) // the condition should never happen. Having it here for logic completeness
{
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Object not found from disk", slice, null));
- return;
+ return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Object not found from disk",
+ slice, null));
}
// run the rest in the executor pool, instead of S3 client threadpool
- unzip(file)
+ return unzip(file)
.compose(this::validateFiles)
.compose(this::commit)
.compose(x -> {
@@ -304,7 +312,7 @@
return Future.succeededFuture();
}
- return executorPool.executeBlocking(promise -> {
+ return executorPool.<Void>executeBlocking(promise -> {
onSuccessCommit.run();
promise.tryComplete();
});
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
index 0174e12..f26e859 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
@@ -70,8 +70,8 @@
.compose(computedDigest -> {
if (!computedDigest.equals(digest.value()))
{
- logger.error("Digest mismatch. computed_digest={}, expected_digest={}, algorithm=MD5",
- computedDigest, digest.value());
+ logger.error("Digest mismatch. computed_digest={}, expected_digest={}, algorithm={}",
+ computedDigest, digest.value(), digest.algorithm());
return Future.failedFuture(new HttpException(CHECKSUM_MISMATCH.code(),
String.format("Digest mismatch. "
+ "expected_digest=%s, "
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 728f5d2..6e9e1d7 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -31,6 +31,7 @@
import org.junit.jupiter.api.io.TempDir;
import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
@@ -226,6 +227,81 @@
assertThat(sliceDatabaseAccessor.updateInvokedTimes.get()).isOne();
}
+ @Test
+ void testHandlingUnexpectedExceptionInObjectExistsCheck(@TempDir Path testFolder)
+ {
+ RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED, "QUORUM");
+ when(mockStorageClient.objectExists(mockSlice)).thenThrow(new RuntimeException("Random exception"));
+ Path stagedPath = testFolder.resolve("slice.zip");
+ when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+
+ Promise<RestoreSlice> promise = Promise.promise();
+
+ RestoreSliceTask task = createTask(mockSlice, job);
+ task.handle(promise);
+
+ assertThatThrownBy(() -> getBlocking(promise.future()))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Random exception");
+ }
+
+ @Test
+ void testHandlingUnexpectedExceptionDuringDownloadSliceCheck(@TempDir Path testFolder)
+ {
+ RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED, "QUORUM");
+ Path stagedPath = testFolder.resolve("slice.zip");
+ when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+ when(mockSlice.isCancelled()).thenReturn(false);
+ when(mockStorageClient.objectExists(mockSlice)).thenReturn(CompletableFuture.completedFuture(null));
+ when(mockStorageClient.downloadObjectIfAbsent(mockSlice)).thenThrow(new RuntimeException("Random exception"));
+
+ Promise<RestoreSlice> promise = Promise.promise();
+
+ RestoreSliceTask task = createTask(mockSlice, job);
+ task.handle(promise);
+
+ assertThatThrownBy(() -> getBlocking(promise.future()))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Random exception");
+ }
+
+ @Test
+ void testHandlingUnexpectedExceptionDuringUnzip(@TempDir Path testFolder) throws IOException
+ {
+
+ RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED, "QUORUM");
+ Path stagedPath = testFolder.resolve("slice.zip");
+ Files.createFile(stagedPath);
+ when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+ Promise<RestoreSlice> promise = Promise.promise();
+ RestoreSliceTask task = createTaskWithExceptions(mockSlice, job);
+ task.handle(promise);
+
+ assertThatThrownBy(() -> getBlocking(promise.future()))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Random exception");
+ }
+
+ @Test
+ void testHandlingUnexpectedExceptionDuringDownloadAndImport(@TempDir Path testFolder)
+ {
+ RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED, null);
+ Path stagedPath = testFolder.resolve("slice.zip");
+ when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+ when(mockSlice.isCancelled()).thenReturn(false);
+ when(mockStorageClient.objectExists(mockSlice)).thenReturn(CompletableFuture.completedFuture(null));
+ when(mockStorageClient.downloadObjectIfAbsent(mockSlice)).thenReturn(CompletableFuture.completedFuture(null));
+
+ Promise<RestoreSlice> promise = Promise.promise();
+
+ RestoreSliceTask task = createTaskWithExceptions(mockSlice, job);
+ task.handle(promise);
+
+ assertThatThrownBy(() -> getBlocking(promise.future()))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Random exception");
+ }
+
private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job)
{
when(slice.job()).thenReturn(job);
@@ -236,6 +312,16 @@
0, sliceDatabaseAccessor, stats);
}
+ private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, RestoreJob job)
+ {
+ when(slice.job()).thenReturn(job);
+ assertThat(slice.job()).isSameAs(job);
+ assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
+ assertThat(slice.job().status).isEqualTo(job.status);
+ return new TestUnexpectedExceptionInRestoreSliceTask(slice, mockStorageClient, executorPool,
+ mockSSTableImporter, 0, sliceDatabaseAccessor, stats);
+ }
+
static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor
{
public final AtomicInteger updateInvokedTimes = new AtomicInteger(0);
@@ -269,7 +355,7 @@
}
@Override
- void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
+ Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
{
stats.captureSliceUnzipTime(1, 123L);
stats.captureSliceValidationTime(1, 123L);
@@ -280,12 +366,32 @@
onSuccessCommit.run();
}
event.tryComplete(slice);
+ return Future.succeededFuture();
}
@Override
- void unzipAndImport(Promise<RestoreSlice> event, File file)
+ Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file)
{
- unzipAndImport(event, file, null);
+ return unzipAndImport(event, file, null);
+ }
+ }
+
+ static class TestUnexpectedExceptionInRestoreSliceTask extends RestoreSliceTask
+ {
+ public TestUnexpectedExceptionInRestoreSliceTask(RestoreSlice slice, StorageClient s3Client,
+ TaskExecutorPool executorPool, SSTableImporter importer,
+ double requiredUsableSpacePercentage,
+ RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
+ RestoreJobStats stats)
+ {
+ super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats,
+ null);
+ }
+
+ @Override
+ Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
+ {
+ throw new RuntimeException("Random exception");
}
}
}