CASSANDRASC-106: Add restore task watcher to report long running tasks (#104)
Patch by Doug Rohrer; Reviewed by Yifan Cai, Francisco Guerrero for CASSANDRASC-106
diff --git a/CHANGES.txt b/CHANGES.txt
index e625cde..ce39b66 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Add restore task watcher to report long running tasks (CASSANDRASC-106)
* RestoreSliceTask could be stuck due to missing exception handling (CASSANDRASC-105)
* Make hash algorithm implementation pluggable (CASSANDRASC-114)
* Fix ClosedChannelException when downloading from S3 (CASSANDRASC-112)
@@ -81,4 +82,4 @@
* Add integration tests task (CASSANDRA-15031)
* Add support for SSL and bindable address (CASSANDRA-15030)
* Autogenerate API docs for sidecar (CASSANDRA-15028)
- * C* Management process (CASSANDRA-14395)
+ * C* Management process (CASSANDRA-14395)
\ No newline at end of file
diff --git a/checkstyle.xml b/checkstyle.xml
index fd7c1b0..cea9a21 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -244,8 +244,8 @@
<module name="LineLength">
<!-- Checks if a line is too long. -->
- <property name="max" value="120" default="120" />
- <property name="severity" value="error" />
+ <property name="max" value="160" />
+ <property name="severity" value="warning" />
<!--
The default ignore pattern exempts the following elements:
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
index 0dd9e52..3480dca 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
@@ -155,6 +155,7 @@
/**
* Closes the underlying HTTP client
*/
+ @Override
public void close() throws Exception
{
httpClient.close();
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java b/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
index 62df635..ecdca71 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
@@ -47,6 +47,7 @@
*
* @return an iterator of {@link SidecarInstance instances}
*/
+ @Override
@NotNull
public Iterator<SidecarInstance> iterator()
{
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
index f394ca2..6bc30a0 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
@@ -119,6 +119,7 @@
*
* @return Session
*/
+ @Override
@Nullable
public synchronized Session get()
{
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index 7bc3a64..8ce81e2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -373,11 +373,13 @@
return nodeSettingsFromJmx;
}
+ @Override
public ResultSet executeLocal(Statement statement)
{
return fromAdapter(adapter -> adapter.executeLocal(statement));
}
+ @Override
public InetSocketAddress localNativeTransportPort()
{
return fromAdapter(ICassandraAdapter::localNativeTransportPort);
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
index 3231f17..ec59165 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
@@ -48,4 +48,9 @@
* @return time to live for restore job tables: restore_job and restore_slice
*/
long restoreJobTablesTtlSeconds();
+
+ /**
+ * @return the number of seconds above which a restore handler is considered "long-running"
+ */
+ long restoreJobLongRunningHandlerThresholdSeconds();
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
index 49334ed..c424728 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
@@ -34,18 +34,21 @@
private String localDc;
private int numConnections;
+ @Override
@JsonProperty("contact_points")
public List<InetSocketAddress> contactPoints()
{
return contactPoints;
}
+ @Override
@JsonProperty("num_connections")
public int numConnections()
{
return numConnections;
}
+ @Override
@JsonProperty("local_dc")
public String localDc()
{
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
index 022d4ec..8349011 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
@@ -47,6 +47,7 @@
/**
* @return the maximum number of connection retry attempts to make before failing
*/
+ @Override
@JsonProperty("max_retries")
public int maxRetries()
{
@@ -56,6 +57,7 @@
/**
* @return the delay, in milliseconds, between retry attempts
*/
+ @Override
@JsonProperty("retry_delay_millis")
public long retryDelayMillis()
{
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
index 3475f91..336340c 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
@@ -36,6 +36,10 @@
public static final int DEFAULT_JOB_DISCOVERY_RECENCY_DAYS = 5;
public static final int DEFAULT_PROCESS_MAX_CONCURRENCY = 20; // process at most 20 slices concurrently
public static final long DEFAULT_RESTORE_JOB_TABLES_TTL_SECONDS = TimeUnit.DAYS.toSeconds(90);
+ public static final String RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS =
+ "restore_job_long_running_threshold_seconds";
+ // A restore job handler is considered long-running if it has been in the "active" list for 10 minutes.
+ private static final long DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS = 600;
@JsonProperty(value = "job_discovery_active_loop_delay_millis")
protected final long jobDiscoveryActiveLoopDelayMillis;
@@ -52,6 +56,11 @@
@JsonProperty(value = "restore_job_tables_ttl_seconds")
protected final long restoreJobTablesTtlSeconds;
+
+ @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
+ defaultValue = DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS + "")
+ private final long restoreJobLongRunningThresholdSeconds;
+
protected RestoreJobConfigurationImpl()
{
this(builder());
@@ -64,6 +73,7 @@
this.jobDiscoveryRecencyDays = builder.jobDiscoveryRecencyDays;
this.processMaxConcurrency = builder.processMaxConcurrency;
this.restoreJobTablesTtlSeconds = builder.restoreJobTablesTtlSeconds;
+ this.restoreJobLongRunningThresholdSeconds = builder.restoreJobLongRunningThresholdSeconds;
validate();
}
@@ -132,6 +142,14 @@
return restoreJobTablesTtlSeconds;
}
+ @Override
+ @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
+ defaultValue = DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS + "")
+ public long restoreJobLongRunningHandlerThresholdSeconds()
+ {
+ return restoreJobLongRunningThresholdSeconds;
+ }
+
public static Builder builder()
{
return new Builder();
@@ -142,6 +160,8 @@
*/
public static class Builder implements DataObjectBuilder<Builder, RestoreJobConfigurationImpl>
{
+ protected long restoreJobLongRunningThresholdSeconds =
+ DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS;
private long jobDiscoveryActiveLoopDelayMillis = DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS;
private long jobDiscoveryIdleLoopDelayMillis = DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS;
private int jobDiscoveryRecencyDays = DEFAULT_JOB_DISCOVERY_RECENCY_DAYS;
@@ -218,6 +238,18 @@
return update(b -> b.restoreJobTablesTtlSeconds = restoreJobTablesTtlSeconds);
}
+ /**
+ * Sets the {@code restoreJobLongRunningThresholdSeconds} and returns a reference to this Builder enabling
+ * method chaining.
+ *
+ * @param restoreJobLongRunningThresholdSeconds the {@code restoreJobLongRunningThresholdSeconds} to set
+ * @return a reference to this Builder
+ */
+ public Builder restoreJobLongRunningThresholdSeconds(long restoreJobLongRunningThresholdSeconds)
+ {
+ return update(b -> b.restoreJobLongRunningThresholdSeconds = restoreJobLongRunningThresholdSeconds);
+ }
+
@Override
public RestoreJobConfigurationImpl build()
{
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
index 7563e93..c4c7100 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
@@ -29,8 +29,6 @@
import java.util.UUID;
import com.datastax.driver.core.Row;
-import io.vertx.core.Handler;
-import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.DataObjectBuilder;
import org.apache.cassandra.sidecar.common.data.CreateSliceRequestPayload;
@@ -40,6 +38,7 @@
import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.restore.RestoreJobUtil;
+import org.apache.cassandra.sidecar.restore.RestoreSliceHandler;
import org.apache.cassandra.sidecar.restore.RestoreSliceTask;
import org.apache.cassandra.sidecar.restore.RestoreSliceTracker;
import org.apache.cassandra.sidecar.restore.StorageClient;
@@ -229,17 +228,17 @@
/**
* @return {@link RestoreSliceTask} of the restore slice. See {@link RestoreSliceTask} for the steps.
*/
- public Handler<Promise<RestoreSlice>> toAsyncTask(StorageClientPool s3ClientPool,
- ExecutorPools.TaskExecutorPool executorPool,
- SSTableImporter importer,
- double requiredUsableSpacePercentage,
- RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
- RestoreJobStats stats,
- RestoreJobUtil restoreJobUtil)
+ public RestoreSliceHandler toAsyncTask(StorageClientPool s3ClientPool,
+ ExecutorPools.TaskExecutorPool executorPool,
+ SSTableImporter importer,
+ double requiredUsableSpacePercentage,
+ RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
+ RestoreJobStats stats,
+ RestoreJobUtil restoreJobUtil)
{
if (isCancelled)
- return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
- this, null));
+ return RestoreSliceTask.failed(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
+ this, null), this);
try
{
@@ -254,13 +253,13 @@
catch (IllegalStateException illegalState)
{
// The slice is not registered with a tracker, retry later.
- return promise -> promise.tryFail(RestoreJobExceptions.ofSlice("Restore slice is not started",
- this, illegalState));
+ return RestoreSliceTask.failed(RestoreJobExceptions.ofSlice("Restore slice is not started",
+ this, illegalState), this);
}
catch (Exception cause)
{
- return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is failed",
- this, cause));
+ return RestoreSliceTask.failed(RestoreJobExceptions.ofFatalSlice("Restore slice is failed",
+ this, cause), this);
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
index 0602a9e..3c9e2f1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
@@ -48,6 +48,7 @@
this.tableTtlSeconds = tableTtlSeconds;
}
+ @Override
protected void prepareStatements(@NotNull Session session)
{
insertSlice = prepare(insertSlice, session, CqlLiterals.insertSlice(keyspaceConfig));
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index ef36316..8651d53 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -59,7 +59,7 @@
private static final int RESTORE_JOB_PREFIX_LEN = RESTORE_JOB_PREFIX.length();
private static final int RESTORE_JOB_DEFAULT_HASH_SEED = 0;
- private DigestAlgorithmProvider digestAlgorithmProvider;
+ private final DigestAlgorithmProvider digestAlgorithmProvider;
@Inject
public RestoreJobUtil(@Named("xxhash32") DigestAlgorithmProvider digestAlgorithmProvider)
@@ -223,4 +223,12 @@
}
}
}
+
+ /**
+ * @return the current time in nanoseconds
+ */
+ public long currentTimeNanos()
+ {
+ return System.nanoTime();
+ }
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index 1a438dc..edded10 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -21,8 +21,10 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
@@ -62,6 +64,8 @@
private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
private final RestoreJobStats stats;
private final RestoreJobUtil restoreJobUtil;
+ private final Set<RestoreSliceHandler> activeTasks = ConcurrentHashMap.newKeySet();
+ private final long longRunningHandlerThresholdInSeconds;
private volatile boolean isClosed = false; // OK to run close twice, so relax the control to volatile
@@ -82,6 +86,8 @@
.processMaxConcurrency());
this.requiredUsableSpacePercentage
= config.serviceConfiguration().ssTableUploadConfiguration().minimumSpacePercentageRequired() / 100.0;
+ this.longRunningHandlerThresholdInSeconds = config.restoreJobConfiguration()
+ .restoreJobLongRunningHandlerThresholdSeconds();
this.importer = importer;
this.sliceDatabaseAccessor = sliceDatabaseAccessor;
this.stats = stats;
@@ -128,26 +134,28 @@
if (slice == null) // it should never happen, and is only to make ide happy
{
processMaxConcurrency.releasePermit();
- return;
+ break;
}
// capture the new queue length after polling
sliceQueue.captureImportQueueLength();
- pool.executeBlocking(slice.toAsyncTask(s3ClientPool, pool, importer,
- requiredUsableSpacePercentage,
- sliceDatabaseAccessor, stats,
- restoreJobUtil),
- false) // unordered
+ RestoreSliceHandler task = slice.toAsyncTask(s3ClientPool, pool, importer,
+ requiredUsableSpacePercentage,
+ sliceDatabaseAccessor, stats,
+ restoreJobUtil);
+ activeTasks.add(task);
+ pool.executeBlocking(task, false) // unordered; run in parallel
.onSuccess(restoreSlice -> {
+ int instanceId = slice.owner().id();
if (slice.hasImported())
{
- stats.captureSliceCompletionTime(slice.owner().id(), System.nanoTime() - slice.creationTimeNanos());
+ stats.captureSliceCompletionTime(instanceId, System.nanoTime() - slice.creationTimeNanos());
LOGGER.info("Slice completes successfully. sliceKey={}", slice.key());
slice.complete();
}
else if (slice.hasStaged())
{
- // todo: report stat of time taken to stage
+ stats.captureSliceStageTime(instanceId, task.elapsedInNanos());
LOGGER.info("Slice has been staged successfully. sliceKey={}", slice.key());
// the slice is not fully complete yet. Re-enqueue the slice.
sliceQueue.offer(slice);
@@ -186,12 +194,38 @@
// decrement the active slices and capture the new queue length
sliceQueue.decrementActiveSliceCount(slice);
sliceQueue.captureImportQueueLength();
+ activeTasks.remove(task);
});
}
promise.tryComplete();
+ checkForLongRunningTasks();
sliceQueue.capturePendingSliceCount();
}
+ private void checkForLongRunningTasks()
+ {
+ for (RestoreSliceHandler task : activeTasks)
+ {
+ long elapsedInNanos = task.elapsedInNanos();
+ if (elapsedInNanos == -1)
+ {
+ continue;
+ }
+ long elapsedInSeconds = TimeUnit.SECONDS.convert(elapsedInNanos, TimeUnit.NANOSECONDS);
+ if (elapsedInSeconds > longRunningHandlerThresholdInSeconds)
+ {
+ LOGGER.warn("Long-running restore slice task detected. " +
+ "elapsedSeconds={} thresholdSeconds={} sliceKey={} jobId={} status={}",
+ elapsedInSeconds,
+ longRunningHandlerThresholdInSeconds,
+ task.slice().key(),
+ task.slice().jobId(),
+ task.slice().job().status);
+ stats.captureLongRunningRestoreHandler(task.slice().owner().id(), elapsedInNanos);
+ }
+ }
+ }
+
@Override
public void close()
{
@@ -207,6 +241,12 @@
}
@VisibleForTesting
+ int activeTasks()
+ {
+ return activeTasks.size();
+ }
+
+ @VisibleForTesting
int pendingStartSlices()
{
return sliceQueue.size();
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java
new file mode 100644
index 0000000..fdcf390
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.restore;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.db.RestoreSlice;
+
+/**
+ * A handler that processes a restore slice
+ */
+public interface RestoreSliceHandler extends Handler<Promise<RestoreSlice>>
+{
+ /**
+ * @return slice the handler processes
+ */
+ RestoreSlice slice();
+
+ /**
+ * @return the elapsed time in nanoseconds if the task has started processing, -1 otherwise
+ */
+ long elapsedInNanos();
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index 4e942c7..ed85d3f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -28,7 +28,6 @@
import org.slf4j.LoggerFactory;
import io.vertx.core.Future;
-import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.ext.web.handler.HttpException;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
@@ -59,7 +58,7 @@
*
* Note that the class is package private, and it is not intended to be referenced by other packages.
*/
-public class RestoreSliceTask implements Handler<Promise<RestoreSlice>>
+public class RestoreSliceTask implements RestoreSliceHandler
{
private static final Logger LOGGER = LoggerFactory.getLogger(RestoreSliceTask.class);
@@ -71,6 +70,7 @@
private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
private final RestoreJobStats stats;
private final RestoreJobUtil restoreJobUtil;
+ private long taskStartTimeNanos = -1;
public RestoreSliceTask(RestoreSlice slice,
StorageClient s3Client,
@@ -94,9 +94,15 @@
this.restoreJobUtil = restoreJobUtil;
}
+ public static RestoreSliceHandler failed(RestoreJobException cause, RestoreSlice slice)
+ {
+ return new Failed(cause, slice);
+ }
+
@Override
public void handle(Promise<RestoreSlice> event)
{
+ this.taskStartTimeNanos = restoreJobUtil.currentTimeNanos();
if (failOnCancelled(event))
return;
@@ -192,7 +198,7 @@
.whenComplete((resp, cause) -> {
if (cause == null)
{
- stats.captureSliceReplicationTime(System.nanoTime() - slice.creationTimeNanos());
+ stats.captureSliceReplicationTime(currentTimeInNanos() - slice.creationTimeNanos());
slice.setExistsOnS3();
return;
}
@@ -244,6 +250,11 @@
});
}
+ private long currentTimeInNanos()
+ {
+ return restoreJobUtil.currentTimeNanos();
+ }
+
private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event)
{
if (slice.isCancelled())
@@ -521,4 +532,53 @@
LOGGER.warn("Committing slice failed with HttpException. slice={} statusCode={} exceptionPayload={}",
slice.sliceId(), httpException.getStatusCode(), httpException.getPayload(), httpException);
}
+
+ @Override
+ public long elapsedInNanos()
+ {
+ return taskStartTimeNanos == -1 ? -1 :
+ currentTimeInNanos() - taskStartTimeNanos;
+ }
+
+ @Override
+ public RestoreSlice slice()
+ {
+ return slice;
+ }
+
+ /**
+ * A RestoreSliceHandler that immediately fails the slice/promise.
+ * Used when the processor already knows that a slice should not be processed for some reason
+ * as indicated in cause field.
+ */
+ public static class Failed implements RestoreSliceHandler
+ {
+ private final RestoreJobException cause;
+ private final RestoreSlice slice;
+
+ public Failed(RestoreJobException cause, RestoreSlice slice)
+ {
+ this.cause = cause;
+ this.slice = slice;
+ }
+
+ @Override
+ public void handle(Promise<RestoreSlice> promise)
+ {
+ promise.tryFail(cause);
+ }
+
+ @Override
+ public long elapsedInNanos()
+ {
+ // it fails immediately
+ return 0;
+ }
+
+ @Override
+ public RestoreSlice slice()
+ {
+ return slice;
+ }
+ }
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java b/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
index 1a44bd8..27ca5fe 100644
--- a/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
+++ b/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
@@ -26,7 +26,7 @@
/**
* Captures the total time taken to complete a slice successfully
*
- * @param instanceId instance that contains the slice
+ * @param instanceId instance that is processing the slice
* @param durationNanos duration in nanoseconds
*/
default void captureSliceCompletionTime(int instanceId, long durationNanos)
@@ -35,6 +35,17 @@
}
/**
+ * Captures the total time taken to stage the slice
+ *
+ * @param instanceId instance that contains the slice
+ * @param durationNanos duration in nanoseconds
+ */
+ default void captureSliceStageTime(int instanceId, long durationNanos)
+ {
+
+ }
+
+ /**
* Captures the time taken to import SSTable(s) in a slice
*
* @param instanceId instance that owns the slice
@@ -217,4 +228,14 @@
{
}
+
+ /**
+ * Captures a long-running restore job handler
+ * @param instanceId instance that is processing the slice
+ * @param handlerDuration restore job current duration in nanoseconds
+ */
+ default void captureLongRunningRestoreHandler(int instanceId, long handlerDuration)
+ {
+
+ }
}
diff --git a/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
index 4628a14..5b23031 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
@@ -55,6 +55,7 @@
.collect(Collectors.toList());
}
+ @Override
protected int getNumInstancesToManage(int clusterSize)
{
return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first 2 instances in the "cluster"
diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
index ef76ffa..d0c069d 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
@@ -118,6 +118,7 @@
* @return instance meta information
* @throws NoSuchElementException when the instance with {@code id} does not exist
*/
+ @Override
public InstanceMetadata instanceFromId(int id) throws NoSuchElementException
{
return cassandraTestContext.instancesConfig().instanceFromId(id);
@@ -130,6 +131,7 @@
* @return instance meta information
* @throws NoSuchElementException when the instance for {@code host} does not exist
*/
+ @Override
public InstanceMetadata instanceFromHost(String host) throws NoSuchElementException
{
return cassandraTestContext.instancesConfig().instanceFromHost(host);
diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
index 3bdeead..2dbf616 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
@@ -48,11 +48,13 @@
private final HashedWheelTimer sharedHWT = new HashedWheelTimer(threadFactory);
private final EventLoopGroup sharedEventLoopGroup = new NioEventLoopGroup(0, threadFactory);
+ @Override
public EventLoopGroup eventLoopGroup(ThreadFactory threadFactory)
{
return sharedEventLoopGroup;
}
+ @Override
public void onClusterClose(EventLoopGroup eventLoopGroup)
{
}
@@ -63,6 +65,7 @@
return sharedHWT;
}
+ @Override
public void onClusterClose(Timer timer)
{
}
diff --git a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
index d7cd14b..0e90ae8 100644
--- a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
+++ b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
@@ -100,6 +100,7 @@
}
+ @Override
public int compareTo(SimpleCassandraVersion other)
{
if (major < other.major)
diff --git a/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java b/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
index 641ba55..2939d45 100644
--- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
@@ -30,6 +30,7 @@
@ExtendWith(VertxExtension.class)
public class HealthServiceSslTest extends AbstractHealthServiceTest
{
+ @Override
public boolean isSslEnabled()
{
return true;
diff --git a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java b/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
index ef088db..ac4a942 100644
--- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
@@ -30,6 +30,7 @@
@ExtendWith(VertxExtension.class)
public class HealthServiceTest extends AbstractHealthServiceTest
{
+ @Override
public boolean isSslEnabled()
{
return false;
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 518fc70..68bb4d8 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -112,6 +112,7 @@
RestoreJobConfigurationImpl.builder()
.restoreJobTablesTtlSeconds(TimeUnit.DAYS.toSeconds(14) + 1)
.processMaxConcurrency(RESTORE_MAX_CONCURRENCY)
+ .restoreJobLongRunningThresholdSeconds(1)
.build();
HealthCheckConfiguration healthCheckConfiguration = new HealthCheckConfigurationImpl(200, 1000);
return SidecarConfigurationImpl.builder()
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
index c3fafc8..beea547 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
@@ -19,6 +19,9 @@
package org.apache.cassandra.sidecar.restore;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.BeforeEach;
@@ -28,7 +31,10 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.util.Modules;
+import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreSlice;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.server.MainModule;
@@ -170,14 +176,74 @@
});
}
+ @Test
+ public void testLongRunningHandlerDetection()
+ {
+
+ when(sidecarSchema.isInitialized()).thenReturn(true);
+ periodicTaskExecutor.schedule(processor);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicLong currentTime = new AtomicLong(0);
+ RestoreSlice slice = mockSlowSlice(latch, currentTime::get); // Sets the start time
+ long fiveMinutesInNanos = TimeUnit.NANOSECONDS.convert(5, TimeUnit.MINUTES);
+ currentTime.set(fiveMinutesInNanos);
+ processor.submit(slice);
+ loopAssert(3, () -> {
+ assertThat(stats.longRunningRestoreHandlers.size()).isEqualTo(1);
+ Long handlerTimeInNanos = stats.longRunningRestoreHandlers.get(slice.owner().id());
+ assertThat(handlerTimeInNanos).isNotNull();
+ assertThat(handlerTimeInNanos).isEqualTo(fiveMinutesInNanos);
+ assertThat(processor.activeTasks()).isOne();
+ });
+
+ // Make slice completable.
+ latch.countDown();
+
+ // Make sure when the slice completes the active handler is removed
+ loopAssert(3, () -> {
+ assertThat(processor.activeTasks()).isZero();
+ });
+ }
+
private RestoreSlice mockSlowSlice(CountDownLatch latch)
{
+ return mockSlowSlice(latch, System::nanoTime);
+ }
+
+ private RestoreSlice mockSlowSlice(CountDownLatch latch, Supplier<Long> timeInNanosSupplier)
+ {
RestoreSlice slice = mock(RestoreSlice.class, Mockito.RETURNS_DEEP_STUBS);
when(slice.jobId()).thenReturn(UUIDs.timeBased());
when(slice.owner().id()).thenReturn(1);
- when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any())).thenReturn(promise -> {
- Uninterruptibles.awaitUninterruptibly(latch);
- promise.complete(slice);
+ when(slice.key()).thenReturn("SliceKey");
+ RestoreJob job = RestoreJob.builder()
+ .jobStatus(RestoreJobStatus.CREATED)
+ .build();
+ when(slice.job()).thenReturn(job);
+ when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any())).thenReturn(
+ new RestoreSliceHandler()
+ {
+ private Long startTime = timeInNanosSupplier.get();
+
+ @Override
+ public void handle(Promise<RestoreSlice> promise)
+ {
+ Uninterruptibles.awaitUninterruptibly(latch);
+ promise.complete(slice);
+ }
+
+ @Override
+ public long elapsedInNanos()
+ {
+ return timeInNanosSupplier.get() - startTime;
+ }
+
+ @Override
+ public RestoreSlice slice()
+ {
+ return slice;
+ }
});
when(slice.hasImported()).thenReturn(true);
return slice;
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 6e9e1d7..dcfd6a1 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -25,6 +25,8 @@
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -67,6 +69,7 @@
private TaskExecutorPool executorPool;
private TestRestoreJobStats stats;
private TestRestoreSliceAccessor sliceDatabaseAccessor;
+ private RestoreJobUtil util;
@BeforeEach
void setup()
@@ -82,6 +85,7 @@
mockSSTableImporter = mock(SSTableImporter.class);
executorPool = new ExecutorPools(Vertx.vertx(), new ServiceConfigurationImpl()).internal();
stats = new TestRestoreJobStats();
+ util = mock(RestoreJobUtil.class);
sliceDatabaseAccessor = new TestRestoreSliceAccessor();
}
@@ -302,14 +306,33 @@
.hasMessageContaining("Random exception");
}
+ @Test
+ void testSliceDuration()
+ {
+ RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED, "QUORUM");
+ AtomicLong currentNanos = new AtomicLong(0);
+ RestoreSliceTask task = createTask(mockSlice, job, currentNanos::get);
+ Promise<RestoreSlice> promise = Promise.promise();
+ task.handle(promise); // Task isn't considered started until it `handle` is called
+ currentNanos.set(123L);
+ assertThat(task.elapsedInNanos()).isEqualTo(123L);
+ }
+
private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job)
{
+ return createTask(slice, job, System::nanoTime);
+ }
+
+ private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job, Supplier<Long> currentNanoTimeSupplier)
+ {
when(slice.job()).thenReturn(job);
assertThat(slice.job()).isSameAs(job);
assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
assertThat(slice.job().status).isEqualTo(job.status);
+ RestoreJobUtil util = mock(RestoreJobUtil.class);
+ when(util.currentTimeNanos()).thenAnswer(invok -> currentNanoTimeSupplier.get());
return new TestRestoreSliceTask(slice, mockStorageClient, executorPool, mockSSTableImporter,
- 0, sliceDatabaseAccessor, stats);
+ 0, sliceDatabaseAccessor, stats, util);
}
private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, RestoreJob job)
@@ -319,7 +342,7 @@
assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
assertThat(slice.job().status).isEqualTo(job.status);
return new TestUnexpectedExceptionInRestoreSliceTask(slice, mockStorageClient, executorPool,
- mockSSTableImporter, 0, sliceDatabaseAccessor, stats);
+ mockSSTableImporter, 0, sliceDatabaseAccessor, stats, util);
}
static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor
@@ -346,10 +369,11 @@
public TestRestoreSliceTask(RestoreSlice slice, StorageClient s3Client, TaskExecutorPool executorPool,
SSTableImporter importer, double requiredUsableSpacePercentage,
- RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats)
+ RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats,
+ RestoreJobUtil restoreJobUtil)
{
- super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats,
- null);
+ super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage,
+ sliceDatabaseAccessor, stats, restoreJobUtil);
this.slice = slice;
this.stats = stats;
}
@@ -382,10 +406,10 @@
TaskExecutorPool executorPool, SSTableImporter importer,
double requiredUsableSpacePercentage,
RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
- RestoreJobStats stats)
+ RestoreJobStats stats, RestoreJobUtil util)
{
super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats,
- null);
+ util);
}
@Override
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
index 30bf04b..6cf72c1 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
@@ -240,6 +240,7 @@
super(Vertx.vertx(), 1, null, null, null, null, null, "localhost", 9043);
}
+ @Override
protected JmxNotificationListener initializeJmxListener()
{
return null;
diff --git a/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java b/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
index 713c33c..0a2cd9e 100644
--- a/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
+++ b/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
@@ -19,7 +19,9 @@
package org.apache.cassandra.sidecar.stats;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Test implementation for testing restore job related stats captured
@@ -39,6 +41,7 @@
public long failedJobCount;
public long activeJobCount;
public long tokenRefreshCount;
+ public Map<Integer, Long> longRunningRestoreHandlers = new HashMap<>();
@Override
public void captureSliceCompletionTime(int instanceId, long durationNanos)
@@ -118,4 +121,10 @@
{
tokenRefreshCount += 1;
}
+
+ @Override
+ public void captureLongRunningRestoreHandler(int instanceId, long handlerDuration)
+ {
+ longRunningRestoreHandlers.put(instanceId, handlerDuration);
+ }
}