blob: 0f8781760b7604b1a8b6a54b26084b49c2c807c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.routes.snapshots;
import java.io.FileNotFoundException;
import java.nio.file.NoSuchFileException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
import org.apache.cassandra.sidecar.common.server.TableOperations;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.CacheConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.metrics.CacheStatsCounter;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.routes.AbstractHandler;
import org.apache.cassandra.sidecar.routes.data.SnapshotRequestParam;
import org.apache.cassandra.sidecar.snapshots.SnapshotPathBuilder;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.utils.RequestUtils;
import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
/**
* The <b>GET</b> verb will produce a list of paths of all the snapshot files of a given
* snapshot name.
*
* <p>The query param {@code includeSecondaryIndexFiles} is used to request secondary index
* files along with other files. For example:
*
* <p>{@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot}
* lists all SSTable component files for the <i>"testSnapshot"</i> snapshot for the
* <i>"ks"</i> keyspace and the <i>"tbl"</i> table
*
* <p>{@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot?includeSecondaryIndexFiles=true}
* lists all SSTable component files, including secondary index files, for the
* <i>"testSnapshot"</i> snapshot for the <i>"ks"</i> keyspace and the <i>"tbl"</i> table
*/
@Singleton
public class ListSnapshotHandler extends AbstractHandler<SnapshotRequestParam>
{
private static final String INCLUDE_SECONDARY_INDEX_FILES_QUERY_PARAM = "includeSecondaryIndexFiles";
public static final String SNAPSHOT_CACHE_NAME = "snapshot_cache";
private final SnapshotPathBuilder builder;
private final ServiceConfiguration configuration;
private final CacheConfiguration cacheConfiguration;
private final Cache<String, Future<ListSnapshotFilesResponse>> cache;
@Inject
public ListSnapshotHandler(SnapshotPathBuilder builder,
ServiceConfiguration configuration,
InstanceMetadataFetcher metadataFetcher,
CassandraInputValidator validator,
ExecutorPools executorPools,
SidecarMetrics sidecarMetrics)
{
super(metadataFetcher, executorPools, validator);
this.builder = builder;
this.configuration = configuration;
this.cacheConfiguration = configuration.sstableSnapshotConfiguration().snapshotListCacheConfiguration();
this.cache = initializeCache(cacheConfiguration, sidecarMetrics.server().cacheMetrics().snapshotCacheMetrics);
}
/**
* Lists paths of all the snapshot files of a given snapshot name.
* <p>
* The query param {@code includeSecondaryIndexFiles} is used to request secondary index
* files along with other files. For example:
* <p>
* {@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot}
* lists all SSTable component files for the <i>"testSnapshot"</i> snapshot for the
* <i>"ks"</i> keyspace and the <i>"tbl"</i> table
* <p>
* {@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot?includeSecondaryIndexFiles=true}
* lists all SSTable component files, including secondary index files, for the
* <i>"testSnapshot"</i> snapshot for the <i>"ks"</i> keyspace and the <i>"tbl"</i> table
*
* @param context the event to handle
* @param httpRequest the {@link HttpServerRequest} object
* @param host the name of the host
* @param remoteAddress the remote address that originated the request
* @param request parameters obtained from the request
*/
@Override
public void handleInternal(RoutingContext context,
HttpServerRequest httpRequest,
String host,
SocketAddress remoteAddress,
SnapshotRequestParam request)
{
cachedResponseOrProcess(host, request)
.onSuccess(response -> {
if (response.snapshotFilesInfo().isEmpty())
{
String payload = "Snapshot '" + request.snapshotName() + "' not found";
context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, payload));
}
else
{
logger.debug("SnapshotsHandler handled request={}, remoteAddress={}, " +
"instance={}", request, remoteAddress, host);
context.json(response);
}
})
.onFailure(cause -> processFailure(cause, context, host, remoteAddress, request));
}
@Override
protected void processFailure(Throwable cause,
RoutingContext context,
String host,
SocketAddress remoteAddress,
SnapshotRequestParam request)
{
logger.error("SnapshotsHandler failed for request={}, remoteAddress={}, instance={}, method={}",
request, remoteAddress, host, context.request().method(), cause);
if (cause instanceof FileNotFoundException || cause instanceof NoSuchFileException)
{
context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, cause.getMessage()));
}
else
{
super.processFailure(cause, context, host, remoteAddress, request);
}
}
/**
* {@inheritDoc}
*/
@Override
protected SnapshotRequestParam extractParamsOrThrow(RoutingContext context)
{
boolean includeSecondaryIndexFiles =
RequestUtils.parseBooleanQueryParam(context.request(), INCLUDE_SECONDARY_INDEX_FILES_QUERY_PARAM, false);
return SnapshotRequestParam.builder()
.qualifiedTableName(qualifiedTableName(context))
.snapshotName(context.pathParam("snapshot"))
.includeSecondaryIndexFiles(includeSecondaryIndexFiles)
.build();
}
protected Future<ListSnapshotFilesResponse> cachedResponseOrProcess(String host, SnapshotRequestParam request)
{
if (cache != null && cacheConfiguration.enabled())
{
String key = host + ":" + request;
return cache.get(key, k -> processResponse(host, request));
}
return processResponse(host, request);
}
protected Future<ListSnapshotFilesResponse> processResponse(String host, SnapshotRequestParam request)
{
return dataPaths(host, request.keyspace(), request.tableName())
.compose(dataDirectoryList ->
builder.streamSnapshotFiles(dataDirectoryList,
request.snapshotName(),
request.includeSecondaryIndexFiles())
)
.compose(snapshotFileStream -> buildResponse(host, request, snapshotFileStream));
}
protected Future<List<String>> dataPaths(String host, String keyspace, String table)
{
return executorPools.service().executeBlocking(() -> {
CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
if (delegate == null)
{
throw cassandraServiceUnavailable();
}
TableOperations tableOperations = delegate.tableOperations();
if (tableOperations == null)
{
throw cassandraServiceUnavailable();
}
return tableOperations.getDataPaths(keyspace, table);
});
}
protected Future<ListSnapshotFilesResponse>
buildResponse(String host,
SnapshotRequestParam request,
Stream<SnapshotPathBuilder.SnapshotFile> snapshotFileStream)
{
int sidecarPort = configuration.port();
ListSnapshotFilesResponse response = new ListSnapshotFilesResponse();
snapshotFileStream.forEach(file -> {
response.addSnapshotFile(new ListSnapshotFilesResponse.FileInfo(file.size,
host,
sidecarPort,
file.dataDirectoryIndex,
request.snapshotName(),
request.keyspace(),
request.tableName(),
file.tableId,
file.name));
});
return Future.succeededFuture(response);
}
protected Cache<String, Future<ListSnapshotFilesResponse>> initializeCache(CacheConfiguration cacheConfiguration,
CacheStatsCounter snapshotCacheMetrics)
{
if (cacheConfiguration == null)
{
return null;
}
return Caffeine.newBuilder()
.maximumSize(cacheConfiguration.maximumSize())
.expireAfterAccess(cacheConfiguration.expireAfterAccessMillis(), TimeUnit.MILLISECONDS)
.recordStats(() -> snapshotCacheMetrics)
.removalListener((key, value, cause) ->
logger.debug("Removed from cache={}, entry={}, key={}, cause={}",
SNAPSHOT_CACHE_NAME, value, key, cause))
.build();
}
}