| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.sidecar.restore; |
| |
| import java.io.File; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import io.vertx.core.Future; |
| import io.vertx.core.Handler; |
| import io.vertx.core.Promise; |
| import io.vertx.ext.web.handler.HttpException; |
| import org.apache.cassandra.sidecar.common.data.SSTableImportOptions; |
| import org.apache.cassandra.sidecar.concurrent.ExecutorPools; |
| import org.apache.cassandra.sidecar.db.RestoreJob; |
| import org.apache.cassandra.sidecar.db.RestoreSlice; |
| import org.apache.cassandra.sidecar.exceptions.RestoreJobException; |
| import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; |
| import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; |
| import org.apache.cassandra.sidecar.exceptions.ThrowableUtils; |
| import org.apache.cassandra.sidecar.stats.RestoreJobStats; |
| import org.apache.cassandra.sidecar.stats.Timer; |
| import org.apache.cassandra.sidecar.utils.SSTableImporter; |
| import software.amazon.awssdk.core.exception.ApiCallTimeoutException; |
| import software.amazon.awssdk.services.s3.model.NoSuchKeyException; |
| import software.amazon.awssdk.services.s3.model.S3Exception; |
| |
| import static org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSufficientStorage; |
| |
| /** |
| * An async executable of {@link RestoreSlice} downloads object from remote, validates, |
| * and imports SSTables into Cassandra. |
| * It the execution ever fails, the cause should only be |
| * {@link org.apache.cassandra.sidecar.exceptions.RestoreJobException} |
| * |
| * Note that the class is package private, and it is not intended to be referenced by other packages. |
| */ |
| public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(RestoreSliceTask.class); |
| |
| private final RestoreJob job; |
| private final RestoreSlice slice; |
| private final StorageClient s3Client; |
| private final ExecutorPools.TaskExecutorPool executorPool; |
| private final SSTableImporter importer; |
| private final double requiredUsableSpacePercentage; |
| private final RestoreJobStats stats; |
| |
| public RestoreSliceTask(RestoreJob job, RestoreSlice slice, |
| StorageClient s3Client, |
| ExecutorPools.TaskExecutorPool executorPool, |
| SSTableImporter importer, |
| double requiredUsableSpacePercentage, |
| RestoreJobStats stats) |
| { |
| this.job = job; |
| this.slice = slice; |
| this.s3Client = s3Client; |
| this.executorPool = executorPool; |
| this.importer = importer; |
| this.requiredUsableSpacePercentage = requiredUsableSpacePercentage; |
| this.stats = stats; |
| } |
| |
| @Override |
| public void handle(Promise<RestoreSlice> event) |
| { |
| if (failOnCancelled(event)) |
| return; |
| |
| // The slice, when being process, requires a total of slice size (download) + uncompressed (unzip) to use. |
| // The protection below guards the slice being process, if the usable disk space falls below the threshold |
| // after considering the slice |
| ensureSufficientStorage(slice.targetPathInStaging().toString(), |
| slice.compressedSize() + slice.uncompressedSize(), |
| requiredUsableSpacePercentage, |
| executorPool) |
| .onSuccess(ignored -> downloadSliceAndImport(event)) |
| .onFailure(cause -> { |
| String msg = "Unable to ensure enough space for the slice. Retry later"; |
| event.tryFail(RestoreJobExceptions.ofSlice(msg, slice, cause)); |
| }); |
| } |
| |
| private void downloadSliceAndImport(Promise<RestoreSlice> event) |
| { |
| // 1. check object existence and validate eTag / checksum |
| checkObjectExistence(event) |
| // 2. download slice/object when the remote object exists |
| .thenCompose(headObject -> downloadSlice(event)) |
| // 3. unzip the file and import/commit |
| .thenAccept(file -> unzipAndImport(event, file)); |
| } |
| |
| private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice> event) |
| { |
| // skip query s3 if the object existence is already confirmed |
| if (slice.existsOnS3()) |
| { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| return s3Client |
| .objectExists(slice) // even if the file already exists on disk, we should still check the object existence |
| .whenComplete((resp, cause) -> { |
| if (cause == null) |
| { |
| stats.captureSliceReplicationTime(System.nanoTime() - slice.creationTimeNanos()); |
| slice.setExistsOnS3(); |
| return; |
| } |
| |
| S3Exception s3Exception = ThrowableUtils.getCause(cause, S3Exception.class); |
| if (s3Exception == null) // has non-null cause, but not S3Exception |
| { |
| event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected error when checking object existence", |
| slice, cause)); |
| } |
| else |
| { |
| if (s3Exception instanceof NoSuchKeyException) |
| { |
| event.tryFail(RestoreJobExceptions.ofSlice("Object not found", slice, null)); |
| } |
| else if (s3Exception.statusCode() == 412) |
| { |
| // When checksum/eTag does not match, it should be an unrecoverable error and fail immediately. |
| // For such scenario, we expect "S3Exception: (Status Code: 412)". Also see, |
| // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax |
| event.tryFail(RestoreJobExceptions.ofFatalSlice("Object checksum mismatched", |
| slice, s3Exception)); |
| stats.captureSliceChecksumMismatch(slice.owner().id()); |
| } |
| else if (s3Exception.statusCode() == 403) |
| { |
| // Fail immediately if 403 forbidden is returned. |
| // There might be permission issue on accessing the object. |
| event.tryFail(RestoreJobExceptions.ofFatalSlice("Object access is forbidden", |
| slice, s3Exception)); |
| stats.captureTokenUnauthorized(); |
| } |
| else if (s3Exception.statusCode() == 400 && |
| s3Exception.getMessage().contains("token has expired")) |
| { |
| // Fail the job if 400, token has expired. |
| // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList |
| event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has expired", slice, s3Exception)); |
| stats.captureTokenExpired(); |
| } |
| else |
| { |
| // Retry the other S3Exceptions |
| event.tryFail(RestoreJobExceptions.ofSlice("Unable to check object existence", |
| slice, s3Exception)); |
| } |
| } |
| }); |
| } |
| |
| private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event) |
| { |
| if (slice.isCancelled()) |
| { |
| RestoreJobFatalException ex = RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", |
| slice, null); |
| CompletableFuture<File> failedFuture = new CompletableFuture<>(); |
| failedFuture.completeExceptionally(ex); |
| return failedFuture; |
| } |
| |
| if (slice.downloadAttempt() > 0) |
| { |
| LOGGER.debug("Retrying downloading slice. sliceKey={}", slice.key()); |
| stats.captureSliceDownloadRetry(slice.owner().id()); |
| } |
| |
| LOGGER.info("Begin downloading restore slice. sliceKey={}", slice.key()); |
| CompletableFuture<File> future = s3Client |
| .downloadObjectIfAbsent(slice) |
| .whenComplete((file, cause) -> { |
| if (cause != null) |
| { |
| slice.incrementDownloadAttempt(); |
| if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class) != null) |
| { |
| LOGGER.warn("Downloading restore slice times out. sliceKey={}", slice.key()); |
| stats.captureSliceDownloadTimeout(slice.owner().id()); |
| } |
| event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable error when downloading object", |
| slice, cause)); |
| } |
| }); |
| |
| return Timer.measureTimeTaken(future, duration -> { |
| LOGGER.info("Finish downloading restore slice. sliceKey={}", slice.key()); |
| stats.captureSliceDownloaded(slice.owner().id(), |
| slice.compressedSize(), |
| slice.uncompressedSize(), |
| duration); |
| }); |
| } |
| |
| @VisibleForTesting |
| void unzipAndImport(Promise<RestoreSlice> event, File file) |
| { |
| if (file == null) // the condition should never happen. Having it here for logic completeness |
| { |
| event.tryFail(RestoreJobExceptions.ofFatalSlice("Object not found from disk", slice, null)); |
| return; |
| } |
| |
| // run the rest in the executor pool, instead of S3 client threadpool |
| unzip(file) |
| .compose(this::validateFiles) |
| .compose(this::commit) |
| .onSuccess(x -> event.tryComplete(slice)) |
| .onFailure(failure -> { |
| logWarnIfHasHttpExceptionCauseOnCommit(failure, slice); |
| event.tryFail(RestoreJobExceptions.propagate("Fail to commit slice. " |
| + slice.shortDescription(), failure)); |
| }); |
| } |
| |
| private Future<File> unzip(File zipFile) |
| { |
| Future<File> future = executorPool.executeBlocking(promise -> { |
| if (failOnCancelled(promise)) |
| return; |
| |
| if (!zipFile.exists()) |
| { |
| promise.tryFail(new RestoreJobException("Object not found from disk. File: " + zipFile)); |
| return; |
| } |
| |
| // targetPathInStaging points to the directory named after uploadId |
| // SSTableImporter expects the file system structure to be uploadId/keyspace/table/sstables |
| File targetDir = slice.targetPathInStaging() |
| .resolve(slice.keyspace()) |
| .resolve(slice.table()) |
| .toFile(); |
| if (!targetDir.mkdirs()) |
| { |
| LOGGER.warn("Error occurred while creating directory for holding SSTables for SSTableImporter"); |
| } |
| try |
| { |
| // Remove all existing files under the target directory |
| // The validation step later expects only the files registered in the manifest. |
| RestoreJobUtil.cleanDirectory(targetDir.toPath()); |
| RestoreJobUtil.unzip(zipFile, targetDir); |
| // Notify the next step that unzip is complete |
| promise.complete(targetDir); |
| // Then, delete the downloaded zip file |
| if (!zipFile.delete()) |
| { |
| LOGGER.warn("Error while deleting file {}, please note for space wastage", |
| zipFile.getAbsolutePath()); |
| } |
| } |
| catch (Exception cause) |
| { |
| promise.tryFail(RestoreJobExceptions.propagate("Failed to unzip. File: " + zipFile, cause)); |
| } |
| }, false); // unordered |
| |
| return Timer.measureTimeTaken(future, d -> stats.captureSliceUnzipTime(slice.owner().id(), d)); |
| } |
| |
| // Validate integrity of the files from the zip. The failures from any step is fatal and not retryable. |
| private Future<File> validateFiles(File directory) |
| { |
| Future<File> future = executorPool.executeBlocking(promise -> { |
| if (failOnCancelled(promise)) |
| return; |
| |
| Map<String, String> checksums; |
| try |
| { |
| File manifestFile = new File(directory, RestoreSliceManifest.MANIFEST_FILE_NAME); |
| RestoreSliceManifest manifest = RestoreSliceManifest.read(manifestFile); |
| checksums = manifest.mergeAllChecksums(); |
| } |
| catch (RestoreJobFatalException e) |
| { |
| promise.tryFail(e); |
| return; |
| } |
| |
| if (checksums.isEmpty()) |
| { |
| promise.tryFail(new RestoreJobFatalException("The downloaded slice has no data. " + |
| "Directory: " + directory)); |
| return; |
| } |
| |
| // exclude the manifest file |
| File[] files = directory.listFiles((dir, name) -> !name.equals(RestoreSliceManifest.MANIFEST_FILE_NAME)); |
| if (files == null || files.length != checksums.size()) |
| { |
| String msg = "Number of files does not match. Expected: " + checksums.size() + |
| "; Actual: " + (files == null ? 0 : files.length) + |
| "; Directory: " + directory; |
| promise.tryFail(new RestoreJobFatalException(msg)); |
| return; |
| } |
| |
| compareChecksums(checksums, files, promise); |
| |
| // capture the data component size of sstables |
| for (File file : files) |
| { |
| if (file.getName().endsWith("-Data.db")) |
| { |
| stats.captureSSTableDataComponentSize(slice.owner().id(), file.length()); |
| } |
| } |
| |
| // all files match with the provided checksums |
| promise.tryComplete(directory); |
| }, false); // unordered |
| |
| return Timer.measureTimeTaken(future, d -> stats.captureSliceValidationTime(slice.owner().id(), d)); |
| } |
| |
| private void compareChecksums(Map<String, String> expectedChecksums, File[] files, Promise<?> promise) |
| { |
| for (File file : files) |
| { |
| String name = file.getName(); |
| String expectedChecksum = expectedChecksums.get(name); |
| if (expectedChecksum == null) |
| { |
| promise.tryFail(new RestoreJobFatalException("File not found in manifest. File: " + name)); |
| return; |
| } |
| |
| try |
| { |
| String actualChecksum = RestoreJobUtil.checksum(file); |
| if (!actualChecksum.equals(expectedChecksum)) |
| { |
| String msg = "Checksum does not match. Expected: " + expectedChecksum + |
| "; actual: " + actualChecksum + "; file: " + file; |
| promise.tryFail(new RestoreJobFatalException(msg)); |
| return; |
| } |
| } |
| catch (Exception cause) |
| { |
| promise.tryFail(new RestoreJobFatalException("Failed to calculate checksum. File: " + file)); |
| return; |
| } |
| } |
| } |
| |
| private Future<Void> commit(File directory) |
| { |
| if (slice.isCancelled()) |
| return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", |
| slice, null)); |
| |
| LOGGER.info("Begin committing SSTables. sliceKey={}", slice.key()); |
| |
| SSTableImportOptions options = job.importOptions; |
| SSTableImporter.ImportOptions importOptions = new SSTableImporter.ImportOptions.Builder() |
| .host(slice.owner().host()) |
| .keyspace(slice.keyspace()) |
| .tableName(slice.table()) |
| .directory(directory.toString()) |
| .resetLevel(options.resetLevel()) |
| .clearRepaired(options.clearRepaired()) |
| .verifySSTables(options.verifySSTables()) |
| .verifyTokens(options.verifyTokens()) |
| .invalidateCaches(options.invalidateCaches()) |
| .extendedVerify(options.extendedVerify()) |
| .copyData(options.copyData()) |
| .uploadId(slice.uploadId()) |
| .build(); |
| Future<Void> future = importer.scheduleImport(importOptions) |
| .onSuccess(ignored -> LOGGER.info("Finish committing SSTables. sliceKey={}", |
| slice.key())); |
| return Timer.measureTimeTaken(future, d -> stats.captureSliceImportTime(slice.owner().id(), d)); |
| } |
| |
| private boolean failOnCancelled(Promise<?> promise) |
| { |
| if (slice.isCancelled()) |
| promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", slice, null)); |
| |
| return slice.isCancelled(); |
| } |
| |
| // SSTableImporter could fail an import with HttpException, |
| // which does not implement toString to log the details, i.e. status code and payload |
| // The method is to log the details if it finds the cause contains HttpException |
| private void logWarnIfHasHttpExceptionCauseOnCommit(Throwable throwable, RestoreSlice slice) |
| { |
| HttpException httpException = ThrowableUtils.getCause(throwable, HttpException.class); |
| if (httpException == null) |
| { |
| return; |
| } |
| |
| LOGGER.warn("Committing slice failed with HttpException. slice={} statusCode={} exceptionPayload={}", |
| slice.sliceId(), httpException.getStatusCode(), httpException.getPayload(), httpException); |
| } |
| } |