CASSANDRASC-105 RestoreSliceTask could be stuck due to missing exception handling (#107)

patch by Saranya Krishnakumar; reviewed by Francisco Guerrero, Yifan Cai for CASSANDRASC-105
diff --git a/CHANGES.txt b/CHANGES.txt
index 0fdc914..e625cde 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * RestoreSliceTask could be stuck due to missing exception handling (CASSANDRASC-105)
  * Make hash algorithm implementation pluggable (CASSANDRASC-114)
  * Fix ClosedChannelException when downloading from S3 (CASSANDRASC-112)
  * Fix NPE thrown when getting StorageClient from pool (CASSANDRASC-110)
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index ecd0734..4e942c7 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -107,7 +107,7 @@
                                 slice.compressedSize() + slice.uncompressedSize(),
                                 requiredUsableSpacePercentage,
                                 executorPool)
-        .onSuccess(ignored -> {
+        .compose(v -> {
             RestoreJob job = slice.job();
             if (job.isManagedBySidecar())
             {
@@ -120,55 +120,63 @@
                         slice.completeStagePhase(); // update the flag if missed
                         sliceDatabaseAccessor.updateStatus(slice);
                         event.tryComplete(slice);
-                        return;
+                        return Future.succeededFuture();
                     }
 
                     // 1. check object existence and validate eTag / checksum
-                    checkObjectExistence(event)
+                    CompletableFuture<Void> fut = checkObjectExistence(event)
                     // 2. download slice/object when the remote object exists
                     .thenCompose(headObject -> downloadSlice(event))
                     // 3. persist status
-                    .thenAccept(x -> {
+                    .thenAccept(file -> {
                         slice.completeStagePhase();
                         sliceDatabaseAccessor.updateStatus(slice);
                         // completed staging. A new task is produced when it comes to import
                         event.tryComplete(slice);
+                    })
+                    .whenComplete((x, cause) -> {
+                        if (cause != null)
+                        {
+                            // handle unexpected errors thrown during download slice call, that do not close event
+                            event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause));
+                        }
                     });
+
+                    return Future.fromCompletionStage(fut);
                 }
                 else if (job.status == RestoreJobStatus.STAGED)
                 {
-                    unzipAndImport(event, slice.stagedObjectPath().toFile(),
-                                   // persist status
-                                   () -> sliceDatabaseAccessor.updateStatus(slice));
+                    return unzipAndImport(event, slice.stagedObjectPath().toFile(),
+                                          // persist status
+                                          () -> sliceDatabaseAccessor.updateStatus(slice));
                 }
                 else
                 {
                     String msg = "Unexpected restore job status. Expected only CREATED or STAGED when " +
                                  "processing active slices. Found status: " + job.status;
                     Exception unexpectedState = new IllegalStateException(msg);
-                    event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status",
-                                                                    slice, unexpectedState));
+                    return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status",
+                                                                                 slice, unexpectedState));
                 }
             }
             else
             {
-                downloadSliceAndImport(event);
+                return downloadSliceAndImport(event);
             }
         })
-        .onFailure(cause -> {
-            String msg = "Unable to ensure enough space for the slice. Retry later";
-            event.tryFail(RestoreJobExceptions.ofSlice(msg, slice, cause));
-        });
+        .onSuccess(v -> event.tryComplete(slice))
+        .onFailure(cause -> event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause)));
     }
 
-    private void downloadSliceAndImport(Promise<RestoreSlice> event)
+    private Future<Void> downloadSliceAndImport(Promise<RestoreSlice> event)
     {
         // 1. check object existence and validate eTag / checksum
-        checkObjectExistence(event)
+        CompletableFuture<File> fut = checkObjectExistence(event)
         // 2. download slice/object when the remote object exists
-        .thenCompose(headObject -> downloadSlice(event))
+        .thenCompose(headObject -> downloadSlice(event));
         // 3. unzip the file and import/commit
-        .thenAccept(file -> unzipAndImport(event, file));
+        return Future.fromCompletionStage(fut)
+                     .compose(file -> unzipAndImport(event, file));
     }
 
     private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice> event)
@@ -281,21 +289,21 @@
     }
 
     @VisibleForTesting
-    void unzipAndImport(Promise<RestoreSlice> event, File file)
+    Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file)
     {
-        unzipAndImport(event, file, null);
+        return unzipAndImport(event, file, null);
     }
 
-    void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
+    Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
     {
         if (file == null) // the condition should never happen. Having it here for logic completeness
         {
-            event.tryFail(RestoreJobExceptions.ofFatalSlice("Object not found from disk", slice, null));
-            return;
+            return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Object not found from disk",
+                                                                         slice, null));
         }
 
         // run the rest in the executor pool, instead of S3 client threadpool
-        unzip(file)
+        return unzip(file)
         .compose(this::validateFiles)
         .compose(this::commit)
         .compose(x -> {
@@ -304,7 +312,7 @@
                 return Future.succeededFuture();
             }
 
-            return executorPool.executeBlocking(promise -> {
+            return executorPool.<Void>executeBlocking(promise -> {
                 onSuccessCommit.run();
                 promise.tryComplete();
             });
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
index 0174e12..f26e859 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
@@ -70,8 +70,8 @@
                  .compose(computedDigest -> {
                      if (!computedDigest.equals(digest.value()))
                      {
-                         logger.error("Digest mismatch. computed_digest={}, expected_digest={}, algorithm=MD5",
-                                      computedDigest, digest.value());
+                         logger.error("Digest mismatch. computed_digest={}, expected_digest={}, algorithm={}",
+                                      computedDigest, digest.value(), digest.algorithm());
                          return Future.failedFuture(new HttpException(CHECKSUM_MISMATCH.code(),
                                                                       String.format("Digest mismatch. "
                                                                                     + "expected_digest=%s, "
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 728f5d2..6e9e1d7 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -31,6 +31,7 @@
 import org.junit.jupiter.api.io.TempDir;
 
 import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Future;
 import io.vertx.core.Promise;
 import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
@@ -226,6 +227,81 @@
         assertThat(sliceDatabaseAccessor.updateInvokedTimes.get()).isOne();
     }
 
+    @Test
+    void testHandlingUnexpectedExceptionInObjectExistsCheck(@TempDir Path testFolder)
+    {
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED, "QUORUM");
+        when(mockStorageClient.objectExists(mockSlice)).thenThrow(new RuntimeException("Random exception"));
+        Path stagedPath = testFolder.resolve("slice.zip");
+        when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+
+        Promise<RestoreSlice> promise = Promise.promise();
+
+        RestoreSliceTask task = createTask(mockSlice, job);
+        task.handle(promise);
+
+        assertThatThrownBy(() -> getBlocking(promise.future()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Random exception");
+    }
+
+    @Test
+    void testHandlingUnexpectedExceptionDuringDownloadSliceCheck(@TempDir Path testFolder)
+    {
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED, "QUORUM");
+        Path stagedPath = testFolder.resolve("slice.zip");
+        when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+        when(mockSlice.isCancelled()).thenReturn(false);
+        when(mockStorageClient.objectExists(mockSlice)).thenReturn(CompletableFuture.completedFuture(null));
+        when(mockStorageClient.downloadObjectIfAbsent(mockSlice)).thenThrow(new RuntimeException("Random exception"));
+
+        Promise<RestoreSlice> promise = Promise.promise();
+
+        RestoreSliceTask task = createTask(mockSlice, job);
+        task.handle(promise);
+
+        assertThatThrownBy(() -> getBlocking(promise.future()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Random exception");
+    }
+
+    @Test
+    void testHandlingUnexpectedExceptionDuringUnzip(@TempDir Path testFolder) throws IOException
+    {
+
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED, "QUORUM");
+        Path stagedPath = testFolder.resolve("slice.zip");
+        Files.createFile(stagedPath);
+        when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+        Promise<RestoreSlice> promise = Promise.promise();
+        RestoreSliceTask task = createTaskWithExceptions(mockSlice, job);
+        task.handle(promise);
+
+        assertThatThrownBy(() -> getBlocking(promise.future()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Random exception");
+    }
+
+    @Test
+    void testHandlingUnexpectedExceptionDuringDownloadAndImport(@TempDir Path testFolder)
+    {
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED, null);
+        Path stagedPath = testFolder.resolve("slice.zip");
+        when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+        when(mockSlice.isCancelled()).thenReturn(false);
+        when(mockStorageClient.objectExists(mockSlice)).thenReturn(CompletableFuture.completedFuture(null));
+        when(mockStorageClient.downloadObjectIfAbsent(mockSlice)).thenReturn(CompletableFuture.completedFuture(null));
+
+        Promise<RestoreSlice> promise = Promise.promise();
+
+        RestoreSliceTask task = createTaskWithExceptions(mockSlice, job);
+        task.handle(promise);
+
+        assertThatThrownBy(() -> getBlocking(promise.future()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Random exception");
+    }
+
     private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job)
     {
         when(slice.job()).thenReturn(job);
@@ -236,6 +312,16 @@
                                         0, sliceDatabaseAccessor, stats);
     }
 
+    private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, RestoreJob job)
+    {
+        when(slice.job()).thenReturn(job);
+        assertThat(slice.job()).isSameAs(job);
+        assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
+        assertThat(slice.job().status).isEqualTo(job.status);
+        return new TestUnexpectedExceptionInRestoreSliceTask(slice, mockStorageClient, executorPool,
+                                                             mockSSTableImporter, 0, sliceDatabaseAccessor, stats);
+    }
+
     static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor
     {
         public final AtomicInteger updateInvokedTimes = new AtomicInteger(0);
@@ -269,7 +355,7 @@
         }
 
         @Override
-        void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
+        Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
         {
             stats.captureSliceUnzipTime(1, 123L);
             stats.captureSliceValidationTime(1, 123L);
@@ -280,12 +366,32 @@
                 onSuccessCommit.run();
             }
             event.tryComplete(slice);
+            return Future.succeededFuture();
         }
 
         @Override
-        void unzipAndImport(Promise<RestoreSlice> event, File file)
+        Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file)
         {
-            unzipAndImport(event, file, null);
+            return unzipAndImport(event, file, null);
+        }
+    }
+
+    static class TestUnexpectedExceptionInRestoreSliceTask extends RestoreSliceTask
+    {
+        public TestUnexpectedExceptionInRestoreSliceTask(RestoreSlice slice, StorageClient s3Client,
+                                                         TaskExecutorPool executorPool, SSTableImporter importer,
+                                                         double requiredUsableSpacePercentage,
+                                                         RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
+                                                         RestoreJobStats stats)
+        {
+            super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats,
+                  null);
+        }
+
+        @Override
+        Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit)
+        {
+            throw new RuntimeException("Random exception");
         }
     }
 }