CASSANDRASC-107: Improve logging for slice restore task (#108)

Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-107
diff --git a/CHANGES.txt b/CHANGES.txt
index ce39b66..638ee31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Improve logging for slice restore task (CASSANDRASC-107)
  * Add restore task watcher to report long running tasks (CASSANDRASC-106)
  * RestoreSliceTask could be stuck due to missing exception handling (CASSANDRASC-105)
  * Make hash algorithm implementation pluggable (CASSANDRASC-114)
@@ -82,4 +83,4 @@
  * Add integration tests task (CASSANDRA-15031)
  * Add support for SSL and bindable address (CASSANDRA-15030)
  * Autogenerate API docs for sidecar (CASSANDRA-15028)
- * C* Management process (CASSANDRA-14395)
\ No newline at end of file
+ * C* Management process (CASSANDRA-14395)
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 8651d53..be488ae 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -78,9 +78,9 @@
     {
         try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(zipFile.toPath())))
         {
-            ZipEntry zipEntry = zis.getNextEntry();
+            ZipEntry zipEntry;
 
-            while (zipEntry != null)
+            while ((zipEntry = zis.getNextEntry()) != null)
             {
                 // Encounters a directory inside the zip file
                 // It is not expected. The zip file should have the directory depth of 1.
@@ -92,8 +92,6 @@
 
                 File targetFile = newProtectedTargetFile(zipEntry, targetDir);
                 Files.copy(zis, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
-
-                zipEntry = zis.getNextEntry();
             }
             zis.closeEntry();
         }
@@ -161,7 +159,7 @@
                 }
                 catch (IOException e)
                 {
-                    LOGGER.error("Unexpected error occurred while cleaning directory {}, ", path, e);
+                    LOGGER.error("Unexpected error occurred while cleaning directory {}", path, e);
                     throw new RuntimeException(e);
                 }
             });
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index ed85d3f..8c828d8 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -21,7 +21,6 @@
 import java.io.File;
 import java.nio.file.Files;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -48,6 +47,7 @@
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 
+import static io.vertx.core.Future.fromCompletionStage;
 import static org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSufficientStorage;
 
 /**
@@ -55,7 +55,7 @@
  * and imports SSTables into Cassandra.
  * It the execution ever fails, the cause should only be
  * {@link org.apache.cassandra.sidecar.exceptions.RestoreJobException}
- *
+ * <p>
  * Note that the class is package private, and it is not intended to be referenced by other packages.
  */
 public class RestoreSliceTask implements RestoreSliceHandler
@@ -130,25 +130,17 @@
                     }
 
                     // 1. check object existence and validate eTag / checksum
-                    CompletableFuture<Void> fut = checkObjectExistence(event)
-                    // 2. download slice/object when the remote object exists
-                    .thenCompose(headObject -> downloadSlice(event))
-                    // 3. persist status
-                    .thenAccept(file -> {
-                        slice.completeStagePhase();
-                        sliceDatabaseAccessor.updateStatus(slice);
-                        // completed staging. A new task is produced when it comes to import
-                        event.tryComplete(slice);
-                    })
-                    .whenComplete((x, cause) -> {
-                        if (cause != null)
-                        {
-                            // handle unexpected errors thrown during download slice call, that do not close event
-                            event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause));
-                        }
-                    });
-
-                    return Future.fromCompletionStage(fut);
+                    return checkObjectExistence(event)
+                           .compose(headObject -> downloadSlice(event))
+                           .<Void>compose(file -> {
+                               slice.completeStagePhase();
+                               sliceDatabaseAccessor.updateStatus(slice);
+                               return Future.succeededFuture();
+                           })
+                           // completed staging. A new task is produced when it comes to import
+                           .onSuccess(_v -> event.tryComplete(slice))
+                           // handle unexpected errors thrown during download slice call, that do not close event
+                           .onFailure(cause -> event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause)));
                 }
                 else if (job.status == RestoreJobStatus.STAGED)
                 {
@@ -177,75 +169,73 @@
     private Future<Void> downloadSliceAndImport(Promise<RestoreSlice> event)
     {
         // 1. check object existence and validate eTag / checksum
-        CompletableFuture<File> fut = checkObjectExistence(event)
-        // 2. download slice/object when the remote object exists
-        .thenCompose(headObject -> downloadSlice(event));
-        // 3. unzip the file and import/commit
-        return Future.fromCompletionStage(fut)
-                     .compose(file -> unzipAndImport(event, file));
+        return checkObjectExistence(event)
+               // 2. download slice/object when the remote object exists
+               .compose(headObject -> downloadSlice(event))
+               // 3. unzip the file and import/commit
+               .compose(file -> unzipAndImport(event, file));
     }
 
-    private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice> event)
+    private Future<?> checkObjectExistence(Promise<RestoreSlice> event)
     {
         // skip query s3 if the object existence is already confirmed
         if (slice.existsOnS3())
         {
-            return CompletableFuture.completedFuture(null);
+            LOGGER.debug("The slice already exists on S3. jobId={} sliceKey={}", slice.jobId(), slice.key());
+            return Future.succeededFuture();
         }
 
-        return s3Client
-        .objectExists(slice) // even if the file already exists on disk, we should still check the object existence
-        .whenComplete((resp, cause) -> {
-            if (cause == null)
-            {
-                stats.captureSliceReplicationTime(currentTimeInNanos() - slice.creationTimeNanos());
-                slice.setExistsOnS3();
-                return;
-            }
-
+        // even if the file already exists on disk, we should still check the object existence
+        return
+        fromCompletionStage(s3Client.objectExists(slice))
+        .onSuccess(exists -> {
+            long durationNanos = currentTimeInNanos() - slice.creationTimeNanos();
+            stats.captureSliceReplicationTime(durationNanos);
+            slice.setExistsOnS3();
+            LOGGER.debug("Slice is now available on S3. jobId={} sliceKey={} replicationTimeNanos={}",
+                         slice.jobId(), slice.key(), durationNanos);
+        })
+        .onFailure(cause -> {
             S3Exception s3Exception = ThrowableUtils.getCause(cause, S3Exception.class);
             if (s3Exception == null) // has non-null cause, but not S3Exception
             {
                 event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected error when checking object existence",
                                                                 slice, cause));
             }
+            else if (s3Exception instanceof NoSuchKeyException)
+            {
+                event.tryFail(RestoreJobExceptions.ofSlice("Object not found", slice, null));
+            }
+            else if (s3Exception.statusCode() == 412)
+            {
+                // When checksum/eTag does not match, it should be an unrecoverable error and fail immediately.
+                // For such scenario, we expect "S3Exception: (Status Code: 412)". Also see,
+                // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
+                event.tryFail(RestoreJobExceptions.ofFatalSlice("Object checksum mismatched",
+                                                                slice, s3Exception));
+                stats.captureSliceChecksumMismatch(slice.owner().id());
+            }
+            else if (s3Exception.statusCode() == 403)
+            {
+                // Fail immediately if 403 forbidden is returned.
+                // There might be permission issue on accessing the object.
+                event.tryFail(RestoreJobExceptions.ofFatalSlice("Object access is forbidden",
+                                                                slice, s3Exception));
+                stats.captureTokenUnauthorized();
+            }
+            else if (s3Exception.statusCode() == 400 &&
+                     s3Exception.getMessage().contains("token has expired"))
+            {
+                // Fail the job if 400, token has expired.
+                // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
+                event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has expired", slice, s3Exception));
+                stats.captureTokenExpired();
+            }
             else
             {
-                if (s3Exception instanceof NoSuchKeyException)
-                {
-                    event.tryFail(RestoreJobExceptions.ofSlice("Object not found", slice, null));
-                }
-                else if (s3Exception.statusCode() == 412)
-                {
-                    // When checksum/eTag does not match, it should be an unrecoverable error and fail immediately.
-                    // For such scenario, we expect "S3Exception: (Status Code: 412)". Also see,
-                    // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
-                    event.tryFail(RestoreJobExceptions.ofFatalSlice("Object checksum mismatched",
-                                                                    slice, s3Exception));
-                    stats.captureSliceChecksumMismatch(slice.owner().id());
-                }
-                else if (s3Exception.statusCode() == 403)
-                {
-                    // Fail immediately if 403 forbidden is returned.
-                    // There might be permission issue on accessing the object.
-                    event.tryFail(RestoreJobExceptions.ofFatalSlice("Object access is forbidden",
-                                                                    slice, s3Exception));
-                    stats.captureTokenUnauthorized();
-                }
-                else if (s3Exception.statusCode() == 400 &&
-                         s3Exception.getMessage().contains("token has expired"))
-                {
-                    // Fail the job if 400, token has expired.
-                    // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
-                    event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has expired", slice, s3Exception));
-                    stats.captureTokenExpired();
-                }
-                else
-                {
-                    // Retry the other S3Exceptions
-                    event.tryFail(RestoreJobExceptions.ofSlice("Unable to check object existence",
-                                                               slice, s3Exception));
-                }
+                // Retry the other S3Exceptions
+                event.tryFail(RestoreJobExceptions.ofSlice("Unable to check object existence",
+                                                           slice, s3Exception));
             }
         });
     }
@@ -255,16 +245,14 @@
         return restoreJobUtil.currentTimeNanos();
     }
 
-    private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event)
+    private Future<File> downloadSlice(Promise<RestoreSlice> event)
     {
         if (slice.isCancelled())
         {
             RestoreJobFatalException ex = RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
                                                                             slice, null);
             event.tryFail(ex);
-            CompletableFuture<File> failedFuture = new CompletableFuture<>();
-            failedFuture.completeExceptionally(ex);
-            return failedFuture;
+            return Future.failedFuture(ex);
         }
 
         if (slice.downloadAttempt() > 0)
@@ -274,20 +262,17 @@
         }
 
         LOGGER.info("Begin downloading restore slice. sliceKey={}", slice.key());
-        CompletableFuture<File> future = s3Client
-        .downloadObjectIfAbsent(slice)
-        .whenComplete((file, cause) -> {
-            if (cause != null)
+        Future<File> future =
+        fromCompletionStage(s3Client.downloadObjectIfAbsent(slice))
+        .onFailure(cause -> {
+            slice.incrementDownloadAttempt();
+            if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class) != null)
             {
-                slice.incrementDownloadAttempt();
-                if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class) != null)
-                {
-                    LOGGER.warn("Downloading restore slice times out. sliceKey={}", slice.key());
-                    stats.captureSliceDownloadTimeout(slice.owner().id());
-                }
-                event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable error when downloading object",
-                                                                slice, cause));
+                LOGGER.warn("Downloading restore slice times out. sliceKey={}", slice.key());
+                stats.captureSliceDownloadTimeout(slice.owner().id());
             }
+            event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable error when downloading object",
+                                                            slice, cause));
         });
 
         return Timer.measureTimeTaken(future, duration -> {
@@ -315,28 +300,28 @@
 
         // run the rest in the executor pool, instead of S3 client threadpool
         return unzip(file)
-        .compose(this::validateFiles)
-        .compose(this::commit)
-        .compose(x -> {
-            if (onSuccessCommit == null)
-            {
-                return Future.succeededFuture();
-            }
+               .compose(this::validateFiles)
+               .compose(this::commit)
+               .compose(x -> {
+                   if (onSuccessCommit == null)
+                   {
+                       return Future.succeededFuture();
+                   }
 
-            return executorPool.<Void>executeBlocking(promise -> {
-                onSuccessCommit.run();
-                promise.tryComplete();
-            });
-        })
-        .onSuccess(x -> {
-            slice.completeImportPhase();
-            event.tryComplete(slice);
-        })
-        .onFailure(failure -> {
-            logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
-            event.tryFail(RestoreJobExceptions.propagate("Fail to commit slice. "
-                                                         + slice.shortDescription(), failure));
-        });
+                   return executorPool.<Void>executeBlocking(promise -> {
+                       onSuccessCommit.run();
+                       promise.tryComplete();
+                   });
+               })
+               .onSuccess(x -> {
+                   slice.completeImportPhase();
+                   event.tryComplete(slice);
+               })
+               .onFailure(failure -> {
+                   logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
+                   event.tryFail(RestoreJobExceptions.propagate("Fail to commit slice. "
+                                                                + slice.shortDescription(), failure));
+               });
     }
 
     private Future<File> unzip(File zipFile)
@@ -358,7 +343,8 @@
             {
                 if (targetDirExist)
                 {
-                    LOGGER.debug("The files in slice are already extracted. Maybe it is a retried task?");
+                    LOGGER.debug("The files in slice are already extracted. Maybe it is a retried task? " +
+                                 "jobId={} sliceKey={}", slice.jobId(), slice.key());
                     promise.complete(targetDir);
                 }
                 else
@@ -381,8 +367,8 @@
                 // Then, delete the downloaded zip file
                 if (!zipFile.delete())
                 {
-                    LOGGER.warn("File deletion attempt failed. file={}",
-                                zipFile.getAbsolutePath());
+                    LOGGER.warn("File deletion attempt failed. jobId={} sliceKey={} file={}",
+                                slice.jobId(), slice.key(), zipFile.getAbsolutePath());
                 }
             }
             catch (Exception cause)
@@ -487,7 +473,7 @@
             return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
                                                                          slice, null));
 
-        LOGGER.info("Begin committing SSTables. sliceKey={}", slice.key());
+        LOGGER.info("Begin committing SSTables. jobId={} sliceKey={}", slice.jobId(), slice.key());
 
         SSTableImportOptions options = slice.job().importOptions;
         SSTableImporter.ImportOptions importOptions = new SSTableImporter.ImportOptions.Builder()
@@ -505,8 +491,8 @@
                                                       .uploadId(slice.uploadId())
                                                       .build();
         Future<Void> future = importer.scheduleImport(importOptions)
-                                      .onSuccess(ignored -> LOGGER.info("Finish committing SSTables. sliceKey={}",
-                                                                        slice.key()));
+                                      .onSuccess(ignored -> LOGGER.info("Finish committing SSTables. jobId={} sliceKey={}",
+                                                                        slice.jobId(), slice.key()));
         return Timer.measureTimeTaken(future, d -> stats.captureSliceImportTime(slice.owner().id(), d));
     }
 
@@ -529,15 +515,16 @@
             return;
         }
 
-        LOGGER.warn("Committing slice failed with HttpException. slice={} statusCode={} exceptionPayload={}",
-                    slice.sliceId(), httpException.getStatusCode(), httpException.getPayload(), httpException);
+        LOGGER.warn("Committing slice failed with HttpException. jobId={} sliceKey={} statusCode={} " +
+                    "exceptionPayload={}", slice.jobId(), slice.key(), httpException.getStatusCode(),
+                    httpException.getPayload(), httpException);
     }
 
     @Override
     public long elapsedInNanos()
     {
         return taskStartTimeNanos == -1 ? -1 :
-        currentTimeInNanos() - taskStartTimeNanos;
+               currentTimeInNanos() - taskStartTimeNanos;
     }
 
     @Override
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index 9bf4f0e..2795b7b 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -105,6 +105,8 @@
     /**
      * Revoke the credentials of a {@link RestoreJob}
      * It should be called when the job is in a final {@link org.apache.cassandra.sidecar.common.data.RestoreJobStatus}
+     *
+     * @param jobId the unique identifier for the job
      */
     public void revokeCredentials(UUID jobId)
     {
@@ -132,7 +134,7 @@
                          .build();
 
         return client.headObject(request)
-                     .whenComplete(logCredentialOnRequestFailure(credentials));
+                     .whenComplete(logCredentialOnRequestFailure(slice, credentials));
     }
 
     public CompletableFuture<File> downloadObjectIfAbsent(RestoreSlice slice)
@@ -140,6 +142,7 @@
         Credentials credentials = credentialsProviders.get(slice.jobId());
         if (credentials == null)
         {
+            LOGGER.debug("Credentials to download object not found. jobId={}", slice.jobId());
             CompletableFuture<File> failedFuture = new CompletableFuture<>();
             failedFuture.completeExceptionally(credentialsNotFound(slice));
             return failedFuture;
@@ -156,6 +159,8 @@
         File object = objectPath.toFile();
         if (object.exists())
         {
+            LOGGER.debug("Skipping download, file already exists. jobId={} s3_object={}",
+                         slice.jobId(), slice.stagedObjectPath());
             // Skip downloading if the file already exists on disk. It should be a rare scenario.
             // Note that the on-disk file could be different from the remote object, although the name matches.
             // TODO 1: verify etag does not change after s3 replication and batch copy
@@ -166,10 +171,12 @@
         }
         if (!object.getParentFile().mkdirs())
         {
-            LOGGER.warn("Error occurred while creating directory for S3 object {}", objectPath);
+            LOGGER.warn("Error occurred while creating directory. jobId={} s3_object={}",
+                        slice.jobId(), slice.stagedObjectPath());
         }
-        return rateLimitedGetObject(client, request, objectPath)
-               .whenComplete(logCredentialOnRequestFailure(credentials))
+        LOGGER.info("Downloading object. jobId={} s3_object={}", slice.jobId(), slice.stagedObjectPath());
+        return rateLimitedGetObject(slice, client, request, objectPath)
+               .whenComplete(logCredentialOnRequestFailure(slice, credentials))
                .thenApply(res -> object);
     }
 
@@ -206,13 +213,14 @@
                                          "jobId: " + slice.jobId());
     }
 
-    private BiConsumer<Object, ? super Throwable> logCredentialOnRequestFailure(Credentials credentials)
+    private BiConsumer<Object, ? super Throwable> logCredentialOnRequestFailure(RestoreSlice slice,
+                                                                                Credentials credentials)
     {
         return (ignored, cause) -> {
             if (cause != null)
             {
-                LOGGER.error("GetObjectRequest is not successful. credentials={}",
-                             credentials.readCredentials, cause);
+                LOGGER.error("GetObjectRequest is not successful. jobId={} credentials={}",
+                             slice.jobId(), credentials.readCredentials, cause);
             }
         };
     }
@@ -221,17 +229,21 @@
      * Returns a {@link CompletableFuture} to the {@link GetObjectResponse}. It writes the object from S3 to a file
      * applying rate limiting on the download throughput.
      *
+     * @param slice           the slice to be restored
      * @param client          the S3 client
      * @param request         the {@link GetObjectRequest request}
      * @param destinationPath the path where the object will be persisted
      * @return a {@link CompletableFuture} of the {@link GetObjectResponse}
      */
-    private CompletableFuture<GetObjectResponse> rateLimitedGetObject(S3AsyncClient client,
+    private CompletableFuture<GetObjectResponse> rateLimitedGetObject(RestoreSlice slice,
+                                                                      S3AsyncClient client,
                                                                       GetObjectRequest request,
                                                                       Path destinationPath)
     {
         return client.getObject(request, AsyncResponseTransformer.toPublisher())
-                     .thenCompose(responsePublisher -> subscribeRateLimitedWrite(destinationPath, responsePublisher));
+                     .thenCompose(responsePublisher -> subscribeRateLimitedWrite(slice,
+                                                                                 destinationPath,
+                                                                                 responsePublisher));
     }
 
     /**
@@ -239,11 +251,13 @@
      * by subscribing to the {@code publisher}. Applying backpressure on the received bytes by rate limiting
      * the download throughput using the {@code downloadRateLimiter} object.
      *
+     * @param slice           the slice to be restored
      * @param destinationPath the path where the object will be persisted
      * @param publisher       the {@link ResponsePublisher}
      * @return a {@link CompletableFuture} to the {@link GetObjectResponse}
      */
-    CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(Path destinationPath,
+    CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(RestoreSlice slice,
+                                                                   Path destinationPath,
                                                                    ResponsePublisher<GetObjectResponse> publisher)
     {
         WritableByteChannel channel;
@@ -257,10 +271,14 @@
         }
         catch (FileAlreadyExistsException fileAlreadyExistsException)
         {
+            LOGGER.debug("Skipping download, file already exists. jobId={} s3_object={}",
+                         slice.jobId(), slice.stagedObjectPath());
             return CompletableFuture.completedFuture(publisher.response());
         }
         catch (IOException e)
         {
+            LOGGER.error("Error occurred while creating channel. destinationPath={} jobId={} s3_object={}",
+                         destinationPath, slice.jobId(), slice.stagedObjectPath(), e);
             throw new RuntimeException(e);
         }
         // CompletableFuture that will be notified when all events have been consumed or if an error occurs.
@@ -272,6 +290,8 @@
             }
             catch (IOException e)
             {
+                LOGGER.error("Error occurred while downloading. jobId={} s3_object={}",
+                             slice.jobId(), slice.stagedObjectPath(), e);
                 throw new RuntimeException(e);
             }
         }).whenComplete((v, subscribeThrowable) -> closeChannel(channel));