blob: 99babe10168dbb8e93e966ad03cb70e2ea0f81fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.routes.sstableuploads;
import java.util.concurrent.TimeUnit;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.file.FileSystem;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.Configuration;
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.data.SSTableUploadResponse;
import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.data.SSTableUploadRequest;
import org.apache.cassandra.sidecar.routes.AbstractHandler;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.utils.SSTableUploader;
import org.apache.cassandra.sidecar.utils.SSTableUploadsPathBuilder;
import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
/**
* Handler for managing uploaded SSTable components
*/
@Singleton
public class SSTableUploadHandler extends AbstractHandler<SSTableUploadRequest>
{
private final FileSystem fs;
private final Configuration configuration;
private final SSTableUploader uploader;
private final SSTableUploadsPathBuilder uploadPathBuilder;
private final ConcurrencyLimiter limiter;
/**
* Constructs a handler with the provided params.
*
* @param vertx the vertx instance
* @param configuration configuration object holding config details of Sidecar
* @param metadataFetcher the interface to retrieve metadata
* @param uploader a class that uploads the components
* @param uploadPathBuilder a class that provides SSTableUploads directories
* @param executorPools executor pools for blocking executions
* @param validator a validator instance to validate Cassandra-specific input
*/
@Inject
protected SSTableUploadHandler(Vertx vertx,
Configuration configuration,
InstanceMetadataFetcher metadataFetcher,
SSTableUploader uploader,
SSTableUploadsPathBuilder uploadPathBuilder,
ExecutorPools executorPools,
CassandraInputValidator validator)
{
super(metadataFetcher, executorPools, validator);
this.fs = vertx.fileSystem();
this.configuration = configuration;
this.uploader = uploader;
this.uploadPathBuilder = uploadPathBuilder;
this.limiter = new ConcurrencyLimiter(configuration::getConcurrentUploadsLimit);
}
/**
* {@inheritDoc}
*/
@Override
public void handleInternal(RoutingContext context,
HttpServerRequest httpRequest,
String host,
SocketAddress remoteAddress,
SSTableUploadRequest request)
{
// We pause request here, otherwise data streaming will happen before we have our temporary
// file ready for streaming, and we will see request has already been read error. Hence, we
// pause request here and resume it when temporary file has been created and is ready to
// accept the upload.
httpRequest.pause();
long startTimeInNanos = System.nanoTime();
if (!limiter.tryAcquire())
{
String message = String.format("Concurrent upload limit (%d) exceeded", limiter.limit());
context.fail(wrapHttpException(HttpResponseStatus.TOO_MANY_REQUESTS, message));
return;
}
// to make sure that permit is always released
context.addEndHandler(v -> limiter.releasePermit());
validateKeyspaceAndTable(host, request)
.compose(validRequest -> uploadPathBuilder.resolveStagingDirectory(host))
.compose(this::ensureSufficientSpaceAvailable)
.compose(v -> uploadPathBuilder.build(host, request))
.compose(uploadDirectory -> uploader.uploadComponent(httpRequest, uploadDirectory, request.component(),
request.expectedChecksum()))
.compose(fs::props)
.onSuccess(fileProps -> {
long serviceTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
logger.info("Successfully uploaded SSTable component for request={}, remoteAddress={}, " +
"instance={}, sizeInBytes={}, serviceTimeMillis={}",
request, remoteAddress, host, fileProps.size(), serviceTimeMillis);
context.json(new SSTableUploadResponse(request.uploadId(), fileProps.size(), serviceTimeMillis));
})
.onFailure(cause -> processFailure(cause, context, host, remoteAddress, request));
}
/**
* {@inheritDoc}
*/
@Override
protected SSTableUploadRequest extractParamsOrThrow(RoutingContext context)
{
return SSTableUploadRequest.from(qualifiedTableName(context, true), context);
}
/**
* Ensure that keyspace and table name are valid in the cluster.
*
* @param host the Cassandra instance host
* @param request the upload request
* @return {@link Future} containing a valid {@link SSTableUploadRequest request}
*/
private Future<SSTableUploadRequest> validateKeyspaceAndTable(String host,
SSTableUploadRequest request)
{
ExecutorPools.TaskExecutorPool pool = executorPools.service();
return pool.<Metadata>executeBlocking(promise -> {
CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
Metadata metadata = delegate.metadata();
if (metadata == null)
{
promise.fail(cassandraServiceUnavailable());
}
else
{
promise.complete(metadata);
}
})
.compose(metadata -> {
KeyspaceMetadata keyspaceMetadata = metadata.getKeyspace(request.keyspace());
if (keyspaceMetadata == null)
{
String message = String.format("Invalid keyspace '%s' supplied", request.keyspace());
logger.error(message);
return Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
}
if (keyspaceMetadata.getTable(request.tableName()) == null)
{
String message = String.format("Invalid table name '%s' supplied for keyspace '%s'",
request.tableName(), request.keyspace());
logger.error(message);
return Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
}
return Future.succeededFuture(request);
});
}
/**
* Ensures there is sufficient space available as per configured in the
* {@link Configuration#getMinSpacePercentRequiredForUpload()}.
*
* @param uploadDirectory the directory where the SSTables are uploaded
* @return a succeeded future if there is sufficient space available, or failed future otherwise
*/
private Future<String> ensureSufficientSpaceAvailable(String uploadDirectory)
{
float minimumPercentageRequired = configuration.getMinSpacePercentRequiredForUpload();
if (minimumPercentageRequired == 0)
{
return Future.succeededFuture(uploadDirectory);
}
return fs.fsProps(uploadDirectory)
.compose(fsProps -> {
// calculate available disk space percentage
long totalSpace = fsProps.totalSpace();
long usableSpace = fsProps.usableSpace();
// using double for higher precision
double spacePercentAvailable = (usableSpace > 0L && totalSpace > 0L)
? ((double) usableSpace / (double) totalSpace) * 100D
: 0D;
return Future.succeededFuture(spacePercentAvailable);
})
.compose(availableDiskSpacePercentage -> {
if (availableDiskSpacePercentage < minimumPercentageRequired)
{
logger.warn("Insufficient space available for upload in stagingDir={}, available={}%, " +
"required={}%", uploadDirectory,
availableDiskSpacePercentage, minimumPercentageRequired);
return Future.failedFuture(wrapHttpException(HttpResponseStatus.INSUFFICIENT_STORAGE,
"Insufficient space available for upload"));
}
return Future.succeededFuture(uploadDirectory);
});
}
}