CASSANDRASC-133: Allow optional reason to abort restore jobs (#124)
Abort restore job request can include the reason for the operation now optionally. The reason is logged and persisted for the restore job.
Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRASC-133
diff --git a/CHANGES.txt b/CHANGES.txt
index f53f076..43de823 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Allow optional reason to abort restore jobs (CASSANDRASC-133)
* Fix SidecarLoadBalancingPolicy unexpectedly removing local node and improve CI stability (CASSANDRASC-131)
* Reduce implementations accessible from client (CASSANDRASC-127)
* Fix wait time acquired in SidecarRateLimiter (CASSANDRASC-124)
diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
index d18b0b3..48c8289 100644
--- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
+++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
@@ -33,6 +33,7 @@
public static final String JOB_KEYSPACE = "keyspace";
public static final String JOB_TABLE = "table";
public static final String JOB_CONSISTENCY_LEVEL = "consistencyLevel";
+ public static final String JOB_OPERATION_REASON = "reason";
public static final String SLICE_ID = "sliceId";
public static final String BUCKET_ID = "bucketId";
public static final String SLICE_START_TOKEN = "startToken";
diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/AbortRestoreJobRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/AbortRestoreJobRequest.java
index c692b18..88fe835 100644
--- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/AbortRestoreJobRequest.java
+++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/AbortRestoreJobRequest.java
@@ -22,12 +22,15 @@
import io.netty.handler.codec.http.HttpMethod;
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload;
/**
* Represents a request to abort a restore job
*/
public class AbortRestoreJobRequest extends Request
{
+ private final AbortRestoreJobRequestPayload requestPayload;
+
/**
* Constructs a Sidecar request with the given {@code requestURI}. Defaults to {@code ssl} enabled.
*
@@ -35,9 +38,10 @@
* @param table the table name in Cassandra
* @param jobId a unique identifier for the job
*/
- public AbortRestoreJobRequest(String keyspace, String table, UUID jobId)
+ public AbortRestoreJobRequest(String keyspace, String table, UUID jobId, AbortRestoreJobRequestPayload payload)
{
super(requestURI(keyspace, table, jobId));
+ this.requestPayload = payload;
}
@Override
@@ -46,6 +50,12 @@
return HttpMethod.POST;
}
+ @Override
+ public Object requestBody()
+ {
+ return requestPayload;
+ }
+
static String requestURI(String keyspace, String table, UUID jobId)
{
return ApiEndpointsV1.ABORT_RESTORE_JOB_ROUTE
diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayload.java
new file mode 100644
index 0000000..664451a
--- /dev/null
+++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayload.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request.data;
+
+import java.util.regex.Pattern;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_OPERATION_REASON;
+
+/**
+ * Request payload for aborting a restore job.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class AbortRestoreJobRequestPayload
+{
+ private static final Pattern ALPHANUMERIC_BLANK_ONLY = Pattern.compile("^[a-zA-Z0-9 ]*$");
+ private final String reason;
+
+ @JsonCreator
+ public AbortRestoreJobRequestPayload(@Nullable @JsonProperty(JOB_OPERATION_REASON) String reason)
+ {
+ this.reason = validateContent(reason);
+ }
+
+ /**
+ * @return the reason to abort the job
+ */
+ @JsonProperty(JOB_OPERATION_REASON)
+ public String reason()
+ {
+ return reason;
+ }
+
+ /**
+ * As reason string is logged and persisted, the validation is performed to avoid any malicious behavior
+ * @param reason client-sent string content
+ * @return the same string if content is good
+ */
+ private String validateContent(String reason)
+ {
+ if (reason == null)
+ {
+ return null;
+ }
+
+ Preconditions.checkArgument(reason.length() <= 1024, "Reason string is too long");
+ Preconditions.checkArgument(ALPHANUMERIC_BLANK_ONLY.matcher(reason).matches(),
+ "Reason string cannot contain non-alphanumeric-blank characters");
+ return reason;
+ }
+}
diff --git a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayloadTest.java b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayloadTest.java
new file mode 100644
index 0000000..283cdb9
--- /dev/null
+++ b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayloadTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request.data;
+
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class AbortRestoreJobRequestPayloadTest
+{
+ private static final ObjectMapper MAPPER = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ @Test
+ void testSerDeser() throws JsonProcessingException
+ {
+ AbortRestoreJobRequestPayload payload = new AbortRestoreJobRequestPayload("Expired");
+ String json = MAPPER.writeValueAsString(payload);
+ assertThat(json).isEqualTo("{\"reason\":\"Expired\"}");
+ AbortRestoreJobRequestPayload deser = MAPPER.readValue(json, AbortRestoreJobRequestPayload.class);
+ assertThat(deser.reason()).isEqualTo(payload.reason());
+
+ AbortRestoreJobRequestPayload nullPayload = new AbortRestoreJobRequestPayload(null);
+ json = MAPPER.writeValueAsString(nullPayload);
+ assertThat(json).isEqualTo("{}");
+ deser = MAPPER.readValue(json, AbortRestoreJobRequestPayload.class);
+ assertThat(deser.reason()).isNull();
+ }
+
+ @Test
+ void testValidation()
+ {
+ String longString = Stream.generate(() -> "a").limit(2048).collect(Collectors.joining());
+ assertThatThrownBy(() -> new AbortRestoreJobRequestPayload(longString))
+ .hasMessage("Reason string is too long");
+
+ String disallowedChars = "! cat /super/secrets";
+ assertThatThrownBy(() -> new AbortRestoreJobRequestPayload(disallowedChars))
+ .hasMessage("Reason string cannot contain non-alphanumeric-blank characters");
+
+ assertThatThrownBy(() -> MAPPER.readValue(String.format("{\"reason\":\"%s\"}", longString),
+ AbortRestoreJobRequestPayload.class))
+ .hasMessageContaining("Reason string is too long");
+
+ assertThatThrownBy(() -> MAPPER.readValue(String.format("{\"reason\":\"%s\"}", disallowedChars),
+ AbortRestoreJobRequestPayload.class))
+ .hasMessageContaining("Reason string cannot contain non-alphanumeric-blank characters");
+ }
+}
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 6f098f9..3d5c426 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -41,6 +41,7 @@
import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
import org.apache.cassandra.sidecar.common.request.RestoreJobSummaryRequest;
import org.apache.cassandra.sidecar.common.request.UpdateRestoreJobRequest;
+import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobResponsePayload;
import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload;
@@ -517,10 +518,11 @@
* {@inheritDoc}
*/
@Override
- public CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId)
+ public CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId,
+ AbortRestoreJobRequestPayload payload)
{
return executor.executeRequestAsync(requestBuilder()
- .request(new AbortRestoreJobRequest(keyspace, table, jobId))
+ .request(new AbortRestoreJobRequest(keyspace, table, jobId, payload))
.build());
}
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClientBlobRestoreExtension.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClientBlobRestoreExtension.java
index 7ce22e9..7b3aa8f 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClientBlobRestoreExtension.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClientBlobRestoreExtension.java
@@ -22,6 +22,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobResponsePayload;
import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload;
@@ -63,9 +64,20 @@
* @param keyspace name of the keyspace in the cluster
* @param table name of the table in the cluster
* @param jobId job ID of the restore job to be updated
+ * @param payload request payload
* @return a completable future
*/
- CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId);
+ CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId,
+ AbortRestoreJobRequestPayload payload);
+
+ /**
+ * Abort an existing restore job with no reason
+ * See {@link #abortRestoreJob(String, String, UUID, AbortRestoreJobRequestPayload)}
+ */
+ default CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId)
+ {
+ return abortRestoreJob(keyspace, table, jobId, null);
+ }
/**
* Get the summary of an existing restore job
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
index 475f957..6f34b5c 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
@@ -54,6 +54,8 @@
public final String consistencyLevel;
public final Manager restoreJobManager;
+ private final String statusText;
+
public static Builder builder()
{
return new Builder();
@@ -71,7 +73,7 @@
builder.createdAt(row.getDate("created_at"))
.jobId(row.getUUID("job_id")).jobAgent(row.getString("job_agent"))
.keyspace(row.getString("keyspace_name")).table(row.getString("table_name"))
- .jobStatus(decodeJobStatus(row.getString("status")))
+ .jobStatusText(row.getString("status"))
.jobSecrets(decodeJobSecrets(row.getBytes("blob_secrets")))
.expireAt(row.getTimestamp("expire_at"))
.sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options")))
@@ -85,7 +87,13 @@
private static RestoreJobStatus decodeJobStatus(String status)
{
- return status == null ? null : RestoreJobStatus.valueOf(status.toUpperCase());
+ if (status == null)
+ {
+ return null;
+ }
+
+ String enumLiteral = status.split(":")[0];
+ return RestoreJobStatus.valueOf(enumLiteral.toUpperCase());
}
private static RestoreJobSecrets decodeJobSecrets(ByteBuffer secretsBytes)
@@ -114,6 +122,7 @@
this.tableName = builder.tableName;
this.jobAgent = builder.jobAgent;
this.status = builder.status;
+ this.statusText = builder.statusText;
this.secrets = builder.secrets;
this.importOptions = builder.importOptions == null
? SSTableImportOptions.defaults()
@@ -134,6 +143,11 @@
return restoreJobManager == Manager.SIDECAR;
}
+ public String statusWithOptionalDescription()
+ {
+ return statusText;
+ }
+
/**
* {@inheritDoc}
*/
@@ -145,7 +159,7 @@
"expireAt='%s', bucketCount='%s', consistencyLevel='%s'}",
createdAt.toString(), jobId.toString(),
keyspaceName, tableName,
- status, secrets, importOptions,
+ statusText, secrets, importOptions,
expireAt, bucketCount, consistencyLevel);
}
@@ -177,6 +191,7 @@
private String tableName;
private String jobAgent;
private RestoreJobStatus status;
+ private String statusText;
private RestoreJobSecrets secrets;
private SSTableImportOptions importOptions;
private Date expireAt;
@@ -197,6 +212,7 @@
this.tableName = restoreJob.tableName;
this.jobAgent = restoreJob.jobAgent;
this.status = restoreJob.status;
+ this.statusText = restoreJob.statusText;
this.secrets = restoreJob.secrets;
this.importOptions = restoreJob.importOptions;
this.expireAt = restoreJob.expireAt;
@@ -229,9 +245,25 @@
return update(b -> b.jobAgent = jobAgent);
}
- public Builder jobStatus(RestoreJobStatus jobStatus)
+ public Builder jobStatus(@NotNull RestoreJobStatus jobStatus)
{
- return update(b -> b.status = jobStatus);
+ return update(b -> {
+ b.status = jobStatus;
+ b.statusText = jobStatus.name();
+ });
+ }
+
+ /**
+ * Assign the job status; primarily used when loading the restore job from database
+ * Note that the status text might contain additional description than the status enum
+ * @param statusText status text read from database
+ */
+ public Builder jobStatusText(String statusText)
+ {
+ return update(b -> {
+ b.status = decodeJobStatus(statusText);
+ b.statusText = statusText;
+ });
}
public Builder jobSecrets(RestoreJobSecrets jobSecrets)
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
index 43e5bc6..1987480 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
@@ -45,6 +45,7 @@
import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
import org.apache.cassandra.sidecar.db.schema.RestoreJobsSchema;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.jetbrains.annotations.Nullable;
/**
* RestoreJobs is the data accessor to Cassandra.
@@ -92,7 +93,7 @@
job.keyspaceName,
job.tableName,
job.jobAgent,
- job.status.toString(),
+ job.status.name(),
secrets,
importOptions,
job.consistencyLevel,
@@ -162,13 +163,18 @@
return updateBuilder.build();
}
- public void abort(UUID jobId)
+ public void abort(UUID jobId, @Nullable String reason)
{
sidecarSchema.ensureInitialized();
LocalDate createdAt = RestoreJob.toLocalDate(jobId);
+ String status = RestoreJobStatus.ABORTED.name();
+ if (reason != null)
+ {
+ status = status + ": " + reason;
+ }
BoundStatement statement = restoreJobsSchema.updateStatus()
- .bind(createdAt, jobId, RestoreJobStatus.ABORTED.name());
+ .bind(createdAt, jobId, status);
execute(statement);
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index 2f865ff..c5d007a 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -288,7 +288,7 @@
LOGGER.info("Abort expired job. jobId={} job={}", job.jobId, job);
try
{
- restoreJobDatabaseAccessor.abort(job.jobId);
+ restoreJobDatabaseAccessor.abort(job.jobId, "Expired");
return true;
}
catch (Exception exception) // do not fail on the job. Continue to drain the entire list
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index 02c1a09..cddd950 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -156,7 +156,7 @@
else
{
String msg = "Unexpected restore job status. Expected only CREATED or STAGED when " +
- "processing active slices. Found status: " + job.status;
+ "processing active slices. Found status: " + job.statusWithOptionalDescription();
Exception unexpectedState = new IllegalStateException(msg);
return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status",
slice, unexpectedState));
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
index 52a513f..0f5b822 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
@@ -21,11 +21,12 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.netty.handler.codec.http.HttpResponseStatus;
-import io.vertx.core.Future;
import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.Json;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
import org.apache.cassandra.sidecar.metrics.RestoreMetrics;
@@ -35,6 +36,7 @@
import org.apache.cassandra.sidecar.routes.RoutingContextUtils;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
import static org.apache.cassandra.sidecar.routes.RoutingContextUtils.SC_RESTORE_JOB;
import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
@@ -44,8 +46,10 @@
* {@link org.apache.cassandra.sidecar.db.RestoreJob}
*/
@Singleton
-public class AbortRestoreJobHandler extends AbstractHandler<String>
+public class AbortRestoreJobHandler extends AbstractHandler<AbortRestoreJobRequestPayload>
{
+ private static final AbortRestoreJobRequestPayload EMPTY_PAYLOAD = new AbortRestoreJobRequestPayload(null);
+
private final RestoreJobDatabaseAccessor restoreJobDatabaseAccessor;
private final RestoreJobManagerGroup restoreJobManagerGroup;
private final RestoreMetrics metrics;
@@ -69,33 +73,47 @@
HttpServerRequest httpRequest,
String host,
SocketAddress remoteAddress,
- String jobId)
+ AbortRestoreJobRequestPayload payload)
{
RoutingContextUtils
.getAsFuture(context, SC_RESTORE_JOB)
- .compose(job -> {
+ .map(job -> {
if (RestoreJobStatus.isFinalState(job.status))
{
- return Future.failedFuture(wrapHttpException(HttpResponseStatus.CONFLICT,
- "Job is already in final state: " + job.status));
+ throw wrapHttpException(HttpResponseStatus.CONFLICT,
+ "Job is already in final state: " + job.status);
}
- restoreJobDatabaseAccessor.abort(job.jobId);
+ restoreJobDatabaseAccessor.abort(job.jobId, payload.reason());
+ logger.info("Successfully aborted restore job. job={} remoteAddress={} instance={} reason='{}'",
+ job, remoteAddress, host, payload.reason());
restoreJobManagerGroup.signalRefreshRestoreJob();
- return Future.succeededFuture(job);
+ return job;
})
.onSuccess(job -> {
- logger.info("Successfully aborted restore job. job={}, remoteAddress={}, instance={}",
- job, remoteAddress, host);
metrics.failedJobs.metric.update(1);
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
})
- .onFailure(cause -> processFailure(cause, context, host, remoteAddress, jobId));
+ .onFailure(cause -> processFailure(cause, context, host, remoteAddress, payload));
}
+ @NotNull
@Override
- protected String extractParamsOrThrow(RoutingContext context)
+ protected AbortRestoreJobRequestPayload extractParamsOrThrow(RoutingContext context)
{
- return context.pathParam("jobId");
+ String bodyString = context.body().asString(); // nullable
+
+ try
+ {
+ return Json.decodeValue(bodyString, AbortRestoreJobRequestPayload.class);
+ }
+ catch (Exception cause)
+ {
+ if (bodyString != null)
+ {
+ logger.warn("Failed to deserialize json string into AbortRestoreJobRequestPayload", cause);
+ }
+ return EMPTY_PAYLOAD;
+ }
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandler.java
index a48b314..c2df454 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandler.java
@@ -82,7 +82,7 @@
RestoreJobSummaryResponsePayload response
= new RestoreJobSummaryResponsePayload(restoreJob.createdAt.toString(), restoreJob.jobId,
restoreJob.jobAgent, restoreJob.keyspaceName, restoreJob.tableName,
- restoreJob.secrets, restoreJob.status.toString());
+ restoreJob.secrets, restoreJob.statusWithOptionalDescription());
return Future.succeededFuture(response);
});
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index 6e7288d..cdc5127 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -321,8 +321,8 @@
.handler(validateRestoreJobRequest)
.handler(updateRestoreJobHandler);
- // we don't expect users to send body for abort requests, hence we don't use BodyHandler
router.post(ApiEndpointsV1.ABORT_RESTORE_JOB_ROUTE)
+ .handler(BodyHandler.create())
.handler(validateTableExistence)
.handler(validateRestoreJobRequest)
.handler(abortRestoreJobHandler);
diff --git a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
index 9fdb149..4a02d17 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
@@ -42,6 +42,10 @@
@ExtendWith(VertxExtension.class)
class RestoreJobsDatabaseAccessorIntTest extends IntegrationTestBase
{
+ QualifiedTableName qualifiedTableName = new QualifiedTableName("ks", "tbl");
+ RestoreJobSecrets secrets = RestoreJobSecretsGen.genRestoreJobSecrets();
+ long expiresAtMillis = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+
@CassandraIntegrationTest
void testCrudOperations()
{
@@ -53,16 +57,8 @@
awaitLatchOrTimeout(latch, 10, TimeUnit.SECONDS);
assertThat(accessor.findAllRecent(3)).isEmpty();
- QualifiedTableName qualifiedTableName = new QualifiedTableName("ks", "tbl");
- RestoreJobSecrets secrets = RestoreJobSecretsGen.genRestoreJobSecrets();
- long expiresAtMillis = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
- UUID jobId = UUIDs.timeBased();
- CreateRestoreJobRequestPayload payload = CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis)
- .jobId(jobId)
- .jobAgent("agent")
- .build();
- accessor.create(payload, qualifiedTableName);
-
+ // update this job
+ UUID jobId = createJob(accessor);
List<RestoreJob> foundJobs = accessor.findAllRecent(3);
assertThat(foundJobs).hasSize(1);
assertJob(foundJobs.get(0), jobId, RestoreJobStatus.CREATED, expiresAtMillis, secrets);
@@ -71,17 +67,54 @@
= new UpdateRestoreJobRequestPayload(null, null, RestoreJobStatus.SUCCEEDED, null);
accessor.update(markSucceeded, jobId);
assertJob(accessor.find(jobId), jobId, RestoreJobStatus.SUCCEEDED, expiresAtMillis, secrets);
+
+ // abort this job with reason
+ jobId = createJob(accessor);
+ foundJobs = accessor.findAllRecent(3);
+ assertThat(foundJobs).hasSize(2);
+ accessor.abort(jobId, "Reason");
+ assertJob(accessor.find(jobId), jobId, RestoreJobStatus.ABORTED, expiresAtMillis, secrets, "Reason");
+
+ // abort this job w/o reason
+ jobId = createJob(accessor);
+ foundJobs = accessor.findAllRecent(3);
+ assertThat(foundJobs).hasSize(3);
+ accessor.abort(jobId, null);
+ assertJob(accessor.find(jobId), jobId, RestoreJobStatus.ABORTED, expiresAtMillis, secrets, null);
+ }
+
+ private UUID createJob(RestoreJobDatabaseAccessor accessor)
+ {
+ UUID jobId = UUIDs.timeBased();
+ CreateRestoreJobRequestPayload payload = CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis)
+ .jobId(jobId)
+ .jobAgent("agent")
+ .build();
+ accessor.create(payload, qualifiedTableName);
+
+ return jobId;
}
private void assertJob(RestoreJob job, UUID jobId, RestoreJobStatus status, long expiresAtMillis,
RestoreJobSecrets secrets)
{
+ assertJob(job, jobId, status, expiresAtMillis, secrets, null);
+ }
+
+ private void assertJob(RestoreJob job, UUID jobId, RestoreJobStatus status, long expiresAtMillis,
+ RestoreJobSecrets secrets, String abortReason)
+ {
assertThat(job).isNotNull();
assertThat(job.jobId).isEqualTo(jobId);
assertThat(job.jobAgent).isEqualTo("agent");
assertThat(job.keyspaceName).isEqualTo("ks");
assertThat(job.tableName).isEqualTo("tbl");
assertThat(job.status).isEqualTo(status);
+ if (abortReason != null)
+ {
+ assertThat(job.statusWithOptionalDescription()).isEqualTo(String.format("%s: %s", status, abortReason));
+ }
+ assertThat(job.status).isEqualTo(status);
assertThat(job.expireAt.getTime()).isEqualTo(expiresAtMillis);
assertThat(job.secrets).isEqualTo(secrets);
}
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
index 671cfc3..9a31cd9 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
@@ -56,6 +56,7 @@
import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -247,7 +248,7 @@
new Date(System.currentTimeMillis() - 1000L)))
.collect(Collectors.toList());
ArgumentCaptor<UUID> abortedJobs = ArgumentCaptor.forClass(UUID.class);
- doNothing().when(mockJobAccessor).abort(abortedJobs.capture());
+ doNothing().when(mockJobAccessor).abort(abortedJobs.capture(), eq("Expired"));
when(mockJobAccessor.findAllRecent(anyInt())).thenReturn(mockResult);
loop.registerPeriodicTaskExecutor(executor);
executeBlocking();
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandlerTest.java
index 833832b..54c9d20 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandlerTest.java
@@ -24,12 +24,18 @@
import org.junit.jupiter.api.extension.ExtendWith;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.ext.web.codec.BodyCodec;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.db.RestoreJobTest;
import static org.assertj.core.api.Assertions.assertThat;
@@ -77,22 +83,49 @@
context, HttpResponseStatus.CONFLICT.code());
}
+ @Test
+ void testAbortJobWithReason(VertxTestContext context) throws Throwable
+ {
+ mockLookupRestoreJob(RestoreJobTest::createNewTestingJob);
+ sendAbortRestoreJobRequestAndVerify("ks", "table", "8e5799a4-d277-11ed-8d85-6916bb9b8056",
+ context, HttpResponseStatus.OK.code(),
+ new AbortRestoreJobRequestPayload("Analytics job has failed"));
+ }
+
private void sendAbortRestoreJobRequestAndVerify(String keyspace,
String table,
String jobId,
VertxTestContext context,
int expectedStatusCode) throws Throwable
{
+ sendAbortRestoreJobRequestAndVerify(keyspace, table, jobId, context, expectedStatusCode, null);
+ }
+
+ private void sendAbortRestoreJobRequestAndVerify(String keyspace,
+ String table,
+ String jobId,
+ VertxTestContext context,
+ int expectedStatusCode,
+ AbortRestoreJobRequestPayload requestPayload) throws Throwable
+ {
WebClient client = WebClient.create(vertx, new WebClientOptions());
- client.post(server.actualPort(), "localhost", String.format(RESTORE_JOB_ABORT_ENDPOINT, keyspace, table, jobId))
- .as(BodyCodec.buffer())
- .send(resp -> {
- context.verify(() -> {
- assertThat(resp.result().statusCode()).isEqualTo(expectedStatusCode);
- })
- .completeNow();
- client.close();
- });
+ HttpRequest<Buffer> request = client.post(server.actualPort(),
+ "localhost",
+ String.format(RESTORE_JOB_ABORT_ENDPOINT, keyspace, table, jobId))
+ .as(BodyCodec.buffer());
+ Handler<AsyncResult<HttpResponse<Buffer>>> responseVerifier = resp -> {
+ context.verify(() -> assertThat(resp.result().statusCode()).isEqualTo(expectedStatusCode))
+ .completeNow();
+ client.close();
+ };
+ if (requestPayload != null)
+ {
+ request.sendJson(requestPayload, responseVerifier);
+ }
+ else
+ {
+ request.send(responseVerifier);
+ }
context.awaitCompletion(10, TimeUnit.SECONDS);
}
}
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
index a58cdb8..3eb94a0 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
@@ -190,7 +190,7 @@
}
@Override
- public void abort(UUID jobId)
+ public void abort(UUID jobId, String reason)
{
// do nothing
}