CASSANDRASC-107: Improve logging for slice restore task (#108)
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-107
diff --git a/CHANGES.txt b/CHANGES.txt
index ce39b66..638ee31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Improve logging for slice restore task (CASSANDRASC-107)
* Add restore task watcher to report long running tasks (CASSANDRASC-106)
* RestoreSliceTask could be stuck due to missing exception handling (CASSANDRASC-105)
* Make hash algorithm implementation pluggable (CASSANDRASC-114)
@@ -82,4 +83,4 @@
* Add integration tests task (CASSANDRA-15031)
* Add support for SSL and bindable address (CASSANDRA-15030)
* Autogenerate API docs for sidecar (CASSANDRA-15028)
- * C* Management process (CASSANDRA-14395)
\ No newline at end of file
+ * C* Management process (CASSANDRA-14395)
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 8651d53..be488ae 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -78,9 +78,9 @@
{
try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(zipFile.toPath())))
{
- ZipEntry zipEntry = zis.getNextEntry();
+ ZipEntry zipEntry;
- while (zipEntry != null)
+ while ((zipEntry = zis.getNextEntry()) != null)
{
// Encounters a directory inside the zip file
// It is not expected. The zip file should have the directory depth of 1.
@@ -92,8 +92,6 @@
File targetFile = newProtectedTargetFile(zipEntry, targetDir);
Files.copy(zis, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
-
- zipEntry = zis.getNextEntry();
}
zis.closeEntry();
}
@@ -161,7 +159,7 @@
}
catch (IOException e)
{
- LOGGER.error("Unexpected error occurred while cleaning directory {}, ", path, e);
+ LOGGER.error("Unexpected error occurred while cleaning directory {}", path, e);
throw new RuntimeException(e);
}
});
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index ed85d3f..8c828d8 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -21,7 +21,6 @@
import java.io.File;
import java.nio.file.Files;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -48,6 +47,7 @@
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.S3Exception;
+import static io.vertx.core.Future.fromCompletionStage;
import static org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSufficientStorage;
/**
@@ -55,7 +55,7 @@
* and imports SSTables into Cassandra.
* It the execution ever fails, the cause should only be
* {@link org.apache.cassandra.sidecar.exceptions.RestoreJobException}
- *
+ * <p>
* Note that the class is package private, and it is not intended to be referenced by other packages.
*/
public class RestoreSliceTask implements RestoreSliceHandler
@@ -130,25 +130,17 @@
}
// 1. check object existence and validate eTag / checksum
- CompletableFuture<Void> fut = checkObjectExistence(event)
- // 2. download slice/object when the remote object exists
- .thenCompose(headObject -> downloadSlice(event))
- // 3. persist status
- .thenAccept(file -> {
- slice.completeStagePhase();
- sliceDatabaseAccessor.updateStatus(slice);
- // completed staging. A new task is produced when it comes to import
- event.tryComplete(slice);
- })
- .whenComplete((x, cause) -> {
- if (cause != null)
- {
- // handle unexpected errors thrown during download slice call, that do not close event
- event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause));
- }
- });
-
- return Future.fromCompletionStage(fut);
+ return checkObjectExistence(event)
+ .compose(headObject -> downloadSlice(event))
+ .<Void>compose(file -> {
+ slice.completeStagePhase();
+ sliceDatabaseAccessor.updateStatus(slice);
+ return Future.succeededFuture();
+ })
+ // completed staging. A new task is produced when it comes to import
+ .onSuccess(_v -> event.tryComplete(slice))
+ // handle unexpected errors thrown during download slice call, that do not close event
+ .onFailure(cause -> event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause)));
}
else if (job.status == RestoreJobStatus.STAGED)
{
@@ -177,75 +169,73 @@
private Future<Void> downloadSliceAndImport(Promise<RestoreSlice> event)
{
// 1. check object existence and validate eTag / checksum
- CompletableFuture<File> fut = checkObjectExistence(event)
- // 2. download slice/object when the remote object exists
- .thenCompose(headObject -> downloadSlice(event));
- // 3. unzip the file and import/commit
- return Future.fromCompletionStage(fut)
- .compose(file -> unzipAndImport(event, file));
+ return checkObjectExistence(event)
+ // 2. download slice/object when the remote object exists
+ .compose(headObject -> downloadSlice(event))
+ // 3. unzip the file and import/commit
+ .compose(file -> unzipAndImport(event, file));
}
- private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice> event)
+ private Future<?> checkObjectExistence(Promise<RestoreSlice> event)
{
// skip query s3 if the object existence is already confirmed
if (slice.existsOnS3())
{
- return CompletableFuture.completedFuture(null);
+ LOGGER.debug("The slice already exists on S3. jobId={} sliceKey={}", slice.jobId(), slice.key());
+ return Future.succeededFuture();
}
- return s3Client
- .objectExists(slice) // even if the file already exists on disk, we should still check the object existence
- .whenComplete((resp, cause) -> {
- if (cause == null)
- {
- stats.captureSliceReplicationTime(currentTimeInNanos() - slice.creationTimeNanos());
- slice.setExistsOnS3();
- return;
- }
-
+ // even if the file already exists on disk, we should still check the object existence
+ return
+ fromCompletionStage(s3Client.objectExists(slice))
+ .onSuccess(exists -> {
+ long durationNanos = currentTimeInNanos() - slice.creationTimeNanos();
+ stats.captureSliceReplicationTime(durationNanos);
+ slice.setExistsOnS3();
+ LOGGER.debug("Slice is now available on S3. jobId={} sliceKey={} replicationTimeNanos={}",
+ slice.jobId(), slice.key(), durationNanos);
+ })
+ .onFailure(cause -> {
S3Exception s3Exception = ThrowableUtils.getCause(cause, S3Exception.class);
if (s3Exception == null) // has non-null cause, but not S3Exception
{
event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected error when checking object existence",
slice, cause));
}
+ else if (s3Exception instanceof NoSuchKeyException)
+ {
+ event.tryFail(RestoreJobExceptions.ofSlice("Object not found", slice, null));
+ }
+ else if (s3Exception.statusCode() == 412)
+ {
+ // When checksum/eTag does not match, it should be an unrecoverable error and fail immediately.
+ // For such scenario, we expect "S3Exception: (Status Code: 412)". Also see,
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
+ event.tryFail(RestoreJobExceptions.ofFatalSlice("Object checksum mismatched",
+ slice, s3Exception));
+ stats.captureSliceChecksumMismatch(slice.owner().id());
+ }
+ else if (s3Exception.statusCode() == 403)
+ {
+ // Fail immediately if 403 forbidden is returned.
+ // There might be permission issue on accessing the object.
+ event.tryFail(RestoreJobExceptions.ofFatalSlice("Object access is forbidden",
+ slice, s3Exception));
+ stats.captureTokenUnauthorized();
+ }
+ else if (s3Exception.statusCode() == 400 &&
+ s3Exception.getMessage().contains("token has expired"))
+ {
+ // Fail the job if 400, token has expired.
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
+ event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has expired", slice, s3Exception));
+ stats.captureTokenExpired();
+ }
else
{
- if (s3Exception instanceof NoSuchKeyException)
- {
- event.tryFail(RestoreJobExceptions.ofSlice("Object not found", slice, null));
- }
- else if (s3Exception.statusCode() == 412)
- {
- // When checksum/eTag does not match, it should be an unrecoverable error and fail immediately.
- // For such scenario, we expect "S3Exception: (Status Code: 412)". Also see,
- // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Object checksum mismatched",
- slice, s3Exception));
- stats.captureSliceChecksumMismatch(slice.owner().id());
- }
- else if (s3Exception.statusCode() == 403)
- {
- // Fail immediately if 403 forbidden is returned.
- // There might be permission issue on accessing the object.
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Object access is forbidden",
- slice, s3Exception));
- stats.captureTokenUnauthorized();
- }
- else if (s3Exception.statusCode() == 400 &&
- s3Exception.getMessage().contains("token has expired"))
- {
- // Fail the job if 400, token has expired.
- // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has expired", slice, s3Exception));
- stats.captureTokenExpired();
- }
- else
- {
- // Retry the other S3Exceptions
- event.tryFail(RestoreJobExceptions.ofSlice("Unable to check object existence",
- slice, s3Exception));
- }
+ // Retry the other S3Exceptions
+ event.tryFail(RestoreJobExceptions.ofSlice("Unable to check object existence",
+ slice, s3Exception));
}
});
}
@@ -255,16 +245,14 @@
return restoreJobUtil.currentTimeNanos();
}
- private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event)
+ private Future<File> downloadSlice(Promise<RestoreSlice> event)
{
if (slice.isCancelled())
{
RestoreJobFatalException ex = RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
slice, null);
event.tryFail(ex);
- CompletableFuture<File> failedFuture = new CompletableFuture<>();
- failedFuture.completeExceptionally(ex);
- return failedFuture;
+ return Future.failedFuture(ex);
}
if (slice.downloadAttempt() > 0)
@@ -274,20 +262,17 @@
}
LOGGER.info("Begin downloading restore slice. sliceKey={}", slice.key());
- CompletableFuture<File> future = s3Client
- .downloadObjectIfAbsent(slice)
- .whenComplete((file, cause) -> {
- if (cause != null)
+ Future<File> future =
+ fromCompletionStage(s3Client.downloadObjectIfAbsent(slice))
+ .onFailure(cause -> {
+ slice.incrementDownloadAttempt();
+ if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class) != null)
{
- slice.incrementDownloadAttempt();
- if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class) != null)
- {
- LOGGER.warn("Downloading restore slice times out. sliceKey={}", slice.key());
- stats.captureSliceDownloadTimeout(slice.owner().id());
- }
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable error when downloading object",
- slice, cause));
+ LOGGER.warn("Downloading restore slice times out. sliceKey={}", slice.key());
+ stats.captureSliceDownloadTimeout(slice.owner().id());
}
+ event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable error when downloading object",
+ slice, cause));
});
return Timer.measureTimeTaken(future, duration -> {
@@ -315,28 +300,28 @@
// run the rest in the executor pool, instead of S3 client threadpool
return unzip(file)
- .compose(this::validateFiles)
- .compose(this::commit)
- .compose(x -> {
- if (onSuccessCommit == null)
- {
- return Future.succeededFuture();
- }
+ .compose(this::validateFiles)
+ .compose(this::commit)
+ .compose(x -> {
+ if (onSuccessCommit == null)
+ {
+ return Future.succeededFuture();
+ }
- return executorPool.<Void>executeBlocking(promise -> {
- onSuccessCommit.run();
- promise.tryComplete();
- });
- })
- .onSuccess(x -> {
- slice.completeImportPhase();
- event.tryComplete(slice);
- })
- .onFailure(failure -> {
- logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
- event.tryFail(RestoreJobExceptions.propagate("Fail to commit slice. "
- + slice.shortDescription(), failure));
- });
+ return executorPool.<Void>executeBlocking(promise -> {
+ onSuccessCommit.run();
+ promise.tryComplete();
+ });
+ })
+ .onSuccess(x -> {
+ slice.completeImportPhase();
+ event.tryComplete(slice);
+ })
+ .onFailure(failure -> {
+ logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
+ event.tryFail(RestoreJobExceptions.propagate("Fail to commit slice. "
+ + slice.shortDescription(), failure));
+ });
}
private Future<File> unzip(File zipFile)
@@ -358,7 +343,8 @@
{
if (targetDirExist)
{
- LOGGER.debug("The files in slice are already extracted. Maybe it is a retried task?");
+ LOGGER.debug("The files in slice are already extracted. Maybe it is a retried task? " +
+ "jobId={} sliceKey={}", slice.jobId(), slice.key());
promise.complete(targetDir);
}
else
@@ -381,8 +367,8 @@
// Then, delete the downloaded zip file
if (!zipFile.delete())
{
- LOGGER.warn("File deletion attempt failed. file={}",
- zipFile.getAbsolutePath());
+ LOGGER.warn("File deletion attempt failed. jobId={} sliceKey={} file={}",
+ slice.jobId(), slice.key(), zipFile.getAbsolutePath());
}
}
catch (Exception cause)
@@ -487,7 +473,7 @@
return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
slice, null));
- LOGGER.info("Begin committing SSTables. sliceKey={}", slice.key());
+ LOGGER.info("Begin committing SSTables. jobId={} sliceKey={}", slice.jobId(), slice.key());
SSTableImportOptions options = slice.job().importOptions;
SSTableImporter.ImportOptions importOptions = new SSTableImporter.ImportOptions.Builder()
@@ -505,8 +491,8 @@
.uploadId(slice.uploadId())
.build();
Future<Void> future = importer.scheduleImport(importOptions)
- .onSuccess(ignored -> LOGGER.info("Finish committing SSTables. sliceKey={}",
- slice.key()));
+ .onSuccess(ignored -> LOGGER.info("Finish committing SSTables. jobId={} sliceKey={}",
+ slice.jobId(), slice.key()));
return Timer.measureTimeTaken(future, d -> stats.captureSliceImportTime(slice.owner().id(), d));
}
@@ -529,15 +515,16 @@
return;
}
- LOGGER.warn("Committing slice failed with HttpException. slice={} statusCode={} exceptionPayload={}",
- slice.sliceId(), httpException.getStatusCode(), httpException.getPayload(), httpException);
+ LOGGER.warn("Committing slice failed with HttpException. jobId={} sliceKey={} statusCode={} " +
+ "exceptionPayload={}", slice.jobId(), slice.key(), httpException.getStatusCode(),
+ httpException.getPayload(), httpException);
}
@Override
public long elapsedInNanos()
{
return taskStartTimeNanos == -1 ? -1 :
- currentTimeInNanos() - taskStartTimeNanos;
+ currentTimeInNanos() - taskStartTimeNanos;
}
@Override
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index 9bf4f0e..2795b7b 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -105,6 +105,8 @@
/**
* Revoke the credentials of a {@link RestoreJob}
* It should be called when the job is in a final {@link org.apache.cassandra.sidecar.common.data.RestoreJobStatus}
+ *
+ * @param jobId the unique identifier for the job
*/
public void revokeCredentials(UUID jobId)
{
@@ -132,7 +134,7 @@
.build();
return client.headObject(request)
- .whenComplete(logCredentialOnRequestFailure(credentials));
+ .whenComplete(logCredentialOnRequestFailure(slice, credentials));
}
public CompletableFuture<File> downloadObjectIfAbsent(RestoreSlice slice)
@@ -140,6 +142,7 @@
Credentials credentials = credentialsProviders.get(slice.jobId());
if (credentials == null)
{
+ LOGGER.debug("Credentials to download object not found. jobId={}", slice.jobId());
CompletableFuture<File> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(credentialsNotFound(slice));
return failedFuture;
@@ -156,6 +159,8 @@
File object = objectPath.toFile();
if (object.exists())
{
+ LOGGER.debug("Skipping download, file already exists. jobId={} s3_object={}",
+ slice.jobId(), slice.stagedObjectPath());
// Skip downloading if the file already exists on disk. It should be a rare scenario.
// Note that the on-disk file could be different from the remote object, although the name matches.
// TODO 1: verify etag does not change after s3 replication and batch copy
@@ -166,10 +171,12 @@
}
if (!object.getParentFile().mkdirs())
{
- LOGGER.warn("Error occurred while creating directory for S3 object {}", objectPath);
+ LOGGER.warn("Error occurred while creating directory. jobId={} s3_object={}",
+ slice.jobId(), slice.stagedObjectPath());
}
- return rateLimitedGetObject(client, request, objectPath)
- .whenComplete(logCredentialOnRequestFailure(credentials))
+ LOGGER.info("Downloading object. jobId={} s3_object={}", slice.jobId(), slice.stagedObjectPath());
+ return rateLimitedGetObject(slice, client, request, objectPath)
+ .whenComplete(logCredentialOnRequestFailure(slice, credentials))
.thenApply(res -> object);
}
@@ -206,13 +213,14 @@
"jobId: " + slice.jobId());
}
- private BiConsumer<Object, ? super Throwable> logCredentialOnRequestFailure(Credentials credentials)
+ private BiConsumer<Object, ? super Throwable> logCredentialOnRequestFailure(RestoreSlice slice,
+ Credentials credentials)
{
return (ignored, cause) -> {
if (cause != null)
{
- LOGGER.error("GetObjectRequest is not successful. credentials={}",
- credentials.readCredentials, cause);
+ LOGGER.error("GetObjectRequest is not successful. jobId={} credentials={}",
+ slice.jobId(), credentials.readCredentials, cause);
}
};
}
@@ -221,17 +229,21 @@
* Returns a {@link CompletableFuture} to the {@link GetObjectResponse}. It writes the object from S3 to a file
* applying rate limiting on the download throughput.
*
+ * @param slice the slice to be restored
* @param client the S3 client
* @param request the {@link GetObjectRequest request}
* @param destinationPath the path where the object will be persisted
* @return a {@link CompletableFuture} of the {@link GetObjectResponse}
*/
- private CompletableFuture<GetObjectResponse> rateLimitedGetObject(S3AsyncClient client,
+ private CompletableFuture<GetObjectResponse> rateLimitedGetObject(RestoreSlice slice,
+ S3AsyncClient client,
GetObjectRequest request,
Path destinationPath)
{
return client.getObject(request, AsyncResponseTransformer.toPublisher())
- .thenCompose(responsePublisher -> subscribeRateLimitedWrite(destinationPath, responsePublisher));
+ .thenCompose(responsePublisher -> subscribeRateLimitedWrite(slice,
+ destinationPath,
+ responsePublisher));
}
/**
@@ -239,11 +251,13 @@
* by subscribing to the {@code publisher}. Applying backpressure on the received bytes by rate limiting
* the download throughput using the {@code downloadRateLimiter} object.
*
+ * @param slice the slice to be restored
* @param destinationPath the path where the object will be persisted
* @param publisher the {@link ResponsePublisher}
* @return a {@link CompletableFuture} to the {@link GetObjectResponse}
*/
- CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(Path destinationPath,
+ CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(RestoreSlice slice,
+ Path destinationPath,
ResponsePublisher<GetObjectResponse> publisher)
{
WritableByteChannel channel;
@@ -257,10 +271,14 @@
}
catch (FileAlreadyExistsException fileAlreadyExistsException)
{
+ LOGGER.debug("Skipping download, file already exists. jobId={} s3_object={}",
+ slice.jobId(), slice.stagedObjectPath());
return CompletableFuture.completedFuture(publisher.response());
}
catch (IOException e)
{
+ LOGGER.error("Error occurred while creating channel. destinationPath={} jobId={} s3_object={}",
+ destinationPath, slice.jobId(), slice.stagedObjectPath(), e);
throw new RuntimeException(e);
}
// CompletableFuture that will be notified when all events have been consumed or if an error occurs.
@@ -272,6 +290,8 @@
}
catch (IOException e)
{
+ LOGGER.error("Error occurred while downloading. jobId={} s3_object={}",
+ slice.jobId(), slice.stagedObjectPath(), e);
throw new RuntimeException(e);
}
}).whenComplete((v, subscribeThrowable) -> closeChannel(channel));