blob: a788bd5d7516aaf692a189e70747db40e4de9d17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.routes.restore;
import java.util.UUID;
import javax.annotation.Nullable;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
import org.apache.cassandra.sidecar.common.data.RestoreJobConstants;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
import org.apache.cassandra.sidecar.routes.RoutingContextUtils;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.RESTORE_JOBS;
import static org.apache.cassandra.sidecar.routes.RoutingContextUtils.SC_QUALIFIED_TABLE_NAME;
import static org.apache.cassandra.sidecar.routes.RoutingContextUtils.SC_RESTORE_JOB;
import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
/**
* Handler for validating various facts about requests related to restore functionality of Sidecar
*/
@Singleton
public class RestoreRequestValidationHandler implements Handler<RoutingContext>
{
private final CassandraInputValidator validator;
private final RestoreJobDatabaseAccessor restoreJobs;
private final ExecutorPools executorPools;
@Inject
public RestoreRequestValidationHandler(CassandraInputValidator validator,
RestoreJobDatabaseAccessor restoreJobs,
ExecutorPools executorPools)
{
this.validator = validator;
this.restoreJobs = restoreJobs;
this.executorPools = executorPools;
}
@Override
public void handle(RoutingContext context)
{
QualifiedTableName tableName = verifyQualifiedTableName(context);
RoutingContextUtils.put(context, SC_QUALIFIED_TABLE_NAME, tableName);
UUID jobId = verifyJobIdIfPresent(context);
verifyJobStatusIfPresent(context);
verifyJobExistAndStoreAsync(context, jobId, tableName)
.onSuccess(ignored -> context.next())
.onFailure(context::fail);
}
private QualifiedTableName verifyQualifiedTableName(RoutingContext context)
{
String keyspace = context.pathParam("keyspace");
String table = context.pathParam("table");
validator.validateKeyspaceName(keyspace);
validator.validateTableName(table);
return new QualifiedTableName(keyspace, table, true);
}
private UUID verifyJobIdIfPresent(RoutingContext context)
{
HttpServerRequest request = context.request();
// Skip jobId verification if the request is sent to create restore job endpoint
if (request.method() == HttpMethod.POST
&& request.path().endsWith(RESTORE_JOBS))
{
return null;
}
String jobIdFromPath = context.pathParam("jobId");
if (jobIdFromPath == null)
{
throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
"PathParam jobId must be present for the request");
}
UUID jobId;
try
{
jobId = UUID.fromString(jobIdFromPath);
}
catch (Exception e)
{
throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
"Invalid jobId - not an UUID, " + jobIdFromPath);
}
if (jobId.version() != 1) // Version number 1: Time-based UUID
{
throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
"Invalid jobId - not a time-based UUID, " + jobIdFromPath);
}
return jobId;
}
private Future<Void> verifyJobExistAndStoreAsync(RoutingContext context, @Nullable UUID jobId,
QualifiedTableName tableName)
{
if (jobId == null)
{
return Future.succeededFuture();
}
return findJob(jobId)
.compose(restoreJob -> {
// Make sure the persisted restore job has the matching keyspace and table values
// It is to avoid selecting a wrong job mistakenly
if (!tableName.keyspace().equalsIgnoreCase(restoreJob.keyspaceName)
|| !tableName.tableName().equalsIgnoreCase(restoreJob.tableName))
{
return Future.failedFuture(wrapHttpException(HttpResponseStatus.NOT_FOUND,
"Restore job with jobId: " + jobId +
" is not found for table: " + tableName));
}
RoutingContextUtils.put(context, SC_RESTORE_JOB, restoreJob);
return Future.succeededFuture();
});
}
private void verifyJobStatusIfPresent(RoutingContext context)
{
String status = context.getBodyAsJson() == null
? null
: context.getBodyAsJson().getString(RestoreJobConstants.JOB_STATUS);
if (status == null)
{
return;
}
try
{
RestoreJobStatus.valueOf(status);
}
catch (Exception e)
{
throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
"Unrecognized restore job status passed, " + status);
}
}
private Future<RestoreJob> findJob(UUID jobId)
{
return executorPools.service().executeBlocking(promise -> {
RestoreJob restoreJob = restoreJobs.find(jobId);
if (restoreJob == null)
{
promise.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND,
"Restore job with id: " + jobId + " does not exist"));
}
else
{
promise.complete(restoreJob);
}
});
}
}