blob: 9bf4f0e52b47b139a5cb018f6452e8c15711025e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.restore;
import java.io.File;
import java.io.IOException;
import java.nio.channels.Channel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SidecarRateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.sidecar.common.data.StorageCredentials;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreSlice;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.ResponsePublisher;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
/**
* The client to access an S3-compatible storage service
*/
public class StorageClient
{
private static final Logger LOGGER = LoggerFactory.getLogger(StorageClient.class);
private final S3AsyncClient client;
private final SidecarRateLimiter downloadRateLimiter;
private final Map<UUID, Credentials> credentialsProviders = new ConcurrentHashMap<>();
@VisibleForTesting
StorageClient(S3AsyncClient client)
{
// no rate-limiting
this(client, SidecarRateLimiter.create(-1));
}
StorageClient(S3AsyncClient client, SidecarRateLimiter downloadRateLimiter)
{
this.client = client;
this.downloadRateLimiter = downloadRateLimiter;
}
/**
* Authenticate and cache the credentials of a {@link RestoreJob}
*/
public StorageClient authenticate(RestoreJob restoreJob) throws RestoreJobFatalException
{
Credentials newCredentials = new Credentials(restoreJob);
// Update the credential if absent or secrets is outdated.
credentialsProviders.compute(restoreJob.jobId, (jobId, credentials) -> {
if (credentials == null || !matches(credentials, newCredentials))
{
LOGGER.info("Credentials are updated in the storage client. jobId={} credentials={}",
restoreJob.jobId, newCredentials.readCredentials);
newCredentials.init();
return newCredentials;
}
else
return credentials;
});
return this;
}
/**
* Revoke the credentials of a {@link RestoreJob}
* It should be called when the job is in a final {@link org.apache.cassandra.sidecar.common.data.RestoreJobStatus}
*/
public void revokeCredentials(UUID jobId)
{
LOGGER.info("Revoke credentials for job. jobId={}", jobId);
credentialsProviders.remove(jobId);
}
public CompletableFuture<HeadObjectResponse> objectExists(RestoreSlice slice)
{
Credentials credentials = credentialsProviders.get(slice.jobId());
if (credentials == null)
{
CompletableFuture<HeadObjectResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(credentialsNotFound(slice));
return failedFuture;
}
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
HeadObjectRequest request =
HeadObjectRequest.builder()
.overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider()))
.bucket(slice.bucket())
.key(slice.key())
.ifMatch(quoteIfNeeded(slice.checksum()))
.build();
return client.headObject(request)
.whenComplete(logCredentialOnRequestFailure(credentials));
}
public CompletableFuture<File> downloadObjectIfAbsent(RestoreSlice slice)
{
Credentials credentials = credentialsProviders.get(slice.jobId());
if (credentials == null)
{
CompletableFuture<File> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(credentialsNotFound(slice));
return failedFuture;
}
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
GetObjectRequest request =
GetObjectRequest.builder()
.overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider()))
.bucket(slice.bucket())
.key(slice.key())
.build();
Path objectPath = slice.stagedObjectPath();
File object = objectPath.toFile();
if (object.exists())
{
// Skip downloading if the file already exists on disk. It should be a rare scenario.
// Note that the on-disk file could be different from the remote object, although the name matches.
// TODO 1: verify etag does not change after s3 replication and batch copy
// TODO 2: extend restore_job table to define the multi-part upload chunk size, in order to perform local
// verification of the etag/checksum
// For now, we just skip download, assuming the scenario is rare and no maliciousness
return CompletableFuture.completedFuture(object);
}
if (!object.getParentFile().mkdirs())
{
LOGGER.warn("Error occurred while creating directory for S3 object {}", objectPath);
}
return rateLimitedGetObject(client, request, objectPath)
.whenComplete(logCredentialOnRequestFailure(credentials))
.thenApply(res -> object);
}
public void close()
{
try
{
client.close(); // it closes the thread pool internally
}
catch (Exception ex)
{
LOGGER.warn("Error when closing", ex);
}
}
private boolean matches(Credentials c1, Credentials c2)
{
if (c1 == c2)
return true;
return Objects.equals(c1.readCredentials, c2.readCredentials);
}
private String quoteIfNeeded(String input)
{
if (input.startsWith("\"") && input.endsWith("\""))
return input;
return '"' + input + '"';
}
private IllegalStateException credentialsNotFound(RestoreSlice slice)
{
return new IllegalStateException("No credential available. The job might already have failed." +
"jobId: " + slice.jobId());
}
private BiConsumer<Object, ? super Throwable> logCredentialOnRequestFailure(Credentials credentials)
{
return (ignored, cause) -> {
if (cause != null)
{
LOGGER.error("GetObjectRequest is not successful. credentials={}",
credentials.readCredentials, cause);
}
};
}
/**
* Returns a {@link CompletableFuture} to the {@link GetObjectResponse}. It writes the object from S3 to a file
* applying rate limiting on the download throughput.
*
* @param client the S3 client
* @param request the {@link GetObjectRequest request}
* @param destinationPath the path where the object will be persisted
* @return a {@link CompletableFuture} of the {@link GetObjectResponse}
*/
private CompletableFuture<GetObjectResponse> rateLimitedGetObject(S3AsyncClient client,
GetObjectRequest request,
Path destinationPath)
{
return client.getObject(request, AsyncResponseTransformer.toPublisher())
.thenCompose(responsePublisher -> subscribeRateLimitedWrite(destinationPath, responsePublisher));
}
/**
* Returns a {@link CompletableFuture} to the {@link GetObjectResponse} and consuming the GetObjectResponse
* by subscribing to the {@code publisher}. Applying backpressure on the received bytes by rate limiting
* the download throughput using the {@code downloadRateLimiter} object.
*
* @param destinationPath the path where the object will be persisted
* @param publisher the {@link ResponsePublisher}
* @return a {@link CompletableFuture} to the {@link GetObjectResponse}
*/
CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(Path destinationPath,
ResponsePublisher<GetObjectResponse> publisher)
{
WritableByteChannel channel;
try
{
// always create new file, and fails if it already exists
// this is consistent with the expectation that we won't
// re-download a file that already exists
// The channel is closed on completion of streaming asynchronously
channel = Files.newByteChannel(destinationPath, EnumSet.of(CREATE_NEW, WRITE));
}
catch (FileAlreadyExistsException fileAlreadyExistsException)
{
return CompletableFuture.completedFuture(publisher.response());
}
catch (IOException e)
{
throw new RuntimeException(e);
}
// CompletableFuture that will be notified when all events have been consumed or if an error occurs.
CompletableFuture<Void> subscribeFuture = publisher.subscribe(buffer -> {
downloadRateLimiter.acquire(buffer.remaining()); // apply backpressure on the received bytes
try
{
channel.write(buffer);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}).whenComplete((v, subscribeThrowable) -> closeChannel(channel));
return subscribeFuture.thenApply(v -> publisher.response());
}
/**
* Closes the channel if not-null. Wraps any {@link IOException} in a {@link RuntimeException}
*
* @param channel the channel to be closed
* @throws RuntimeException wrapping any {@link IOException}
*/
private void closeChannel(Channel channel)
{
if (channel != null && channel.isOpen())
{
try
{
channel.close();
}
catch (IOException e)
{
LOGGER.error("Failed to close channel", e);
}
}
}
private static class Credentials
{
final StorageCredentials readCredentials;
private AwsCredentialsProvider awsCredential;
Credentials(RestoreJob restoreJob) throws RestoreJobFatalException
{
if (restoreJob.secrets == null)
{
throw new RestoreJobFatalException("Restore job is missing credentials. JobId: " + restoreJob.jobId);
}
this.readCredentials = restoreJob.secrets.readCredentials();
}
void init()
{
AwsCredentials credentials = AwsSessionCredentials.create(readCredentials.accessKeyId(),
readCredentials.secretAccessKey(),
readCredentials.sessionToken());
this.awsCredential = StaticCredentialsProvider.create(credentials);
}
AwsCredentialsProvider awsCredentialsProvider()
{
return awsCredential;
}
}
}