CASSANDRASC-106: Add restore task watcher to report long running tasks (#104)

Patch by Doug Rohrer; Reviewed by Yifan Cai, Francisco Guerrero for CASSANDRASC-106
diff --git a/CHANGES.txt b/CHANGES.txt
index e625cde..ce39b66 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Add restore task watcher to report long running tasks (CASSANDRASC-106)
  * RestoreSliceTask could be stuck due to missing exception handling (CASSANDRASC-105)
  * Make hash algorithm implementation pluggable (CASSANDRASC-114)
  * Fix ClosedChannelException when downloading from S3 (CASSANDRASC-112)
@@ -81,4 +82,4 @@
  * Add integration tests task (CASSANDRA-15031)
  * Add support for SSL and bindable address (CASSANDRA-15030)
  * Autogenerate API docs for sidecar (CASSANDRA-15028)
- * C* Management process (CASSANDRA-14395)
+ * C* Management process (CASSANDRA-14395)
\ No newline at end of file
diff --git a/checkstyle.xml b/checkstyle.xml
index fd7c1b0..cea9a21 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -244,8 +244,8 @@
 
         <module name="LineLength">
             <!-- Checks if a line is too long. -->
-            <property name="max" value="120" default="120" />
-            <property name="severity" value="error" />
+            <property name="max" value="160" />
+            <property name="severity" value="warning" />
 
             <!--
               The default ignore pattern exempts the following elements:
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
index 0dd9e52..3480dca 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
@@ -155,6 +155,7 @@
     /**
      * Closes the underlying HTTP client
      */
+    @Override
     public void close() throws Exception
     {
         httpClient.close();
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java b/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
index 62df635..ecdca71 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
@@ -47,6 +47,7 @@
      *
      * @return an iterator of {@link SidecarInstance instances}
      */
+    @Override
     @NotNull
     public Iterator<SidecarInstance> iterator()
     {
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
index f394ca2..6bc30a0 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
@@ -119,6 +119,7 @@
      *
      * @return Session
      */
+    @Override
     @Nullable
     public synchronized Session get()
     {
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index 7bc3a64..8ce81e2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -373,11 +373,13 @@
         return nodeSettingsFromJmx;
     }
 
+    @Override
     public ResultSet executeLocal(Statement statement)
     {
         return fromAdapter(adapter -> adapter.executeLocal(statement));
     }
 
+    @Override
     public InetSocketAddress localNativeTransportPort()
     {
         return fromAdapter(ICassandraAdapter::localNativeTransportPort);
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
index 3231f17..ec59165 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
@@ -48,4 +48,9 @@
      * @return time to live for restore job tables: restore_job and restore_slice
      */
     long restoreJobTablesTtlSeconds();
+
+    /**
+     * @return the number of seconds above which a restore handler is considered "long-running"
+     */
+    long restoreJobLongRunningHandlerThresholdSeconds();
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
index 49334ed..c424728 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
@@ -34,18 +34,21 @@
     private String localDc;
     private int numConnections;
 
+    @Override
     @JsonProperty("contact_points")
     public List<InetSocketAddress> contactPoints()
     {
         return contactPoints;
     }
 
+    @Override
     @JsonProperty("num_connections")
     public int numConnections()
     {
         return numConnections;
     }
 
+    @Override
     @JsonProperty("local_dc")
     public String localDc()
     {
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
index 022d4ec..8349011 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
@@ -47,6 +47,7 @@
     /**
      * @return the maximum number of connection retry attempts to make before failing
      */
+    @Override
     @JsonProperty("max_retries")
     public int maxRetries()
     {
@@ -56,6 +57,7 @@
     /**
      * @return the delay, in milliseconds, between retry attempts
      */
+    @Override
     @JsonProperty("retry_delay_millis")
     public long retryDelayMillis()
     {
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
index 3475f91..336340c 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
@@ -36,6 +36,10 @@
     public static final int DEFAULT_JOB_DISCOVERY_RECENCY_DAYS = 5;
     public static final int DEFAULT_PROCESS_MAX_CONCURRENCY = 20; // process at most 20 slices concurrently
     public static final long DEFAULT_RESTORE_JOB_TABLES_TTL_SECONDS = TimeUnit.DAYS.toSeconds(90);
+    public static final String RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS =
+    "restore_job_long_running_threshold_seconds";
+    // A restore job handler is considered long-running if it has been in the "active" list for 10 minutes.
+    private static final long DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS = 600;
 
     @JsonProperty(value = "job_discovery_active_loop_delay_millis")
     protected final long jobDiscoveryActiveLoopDelayMillis;
@@ -52,6 +56,11 @@
     @JsonProperty(value = "restore_job_tables_ttl_seconds")
     protected final long restoreJobTablesTtlSeconds;
 
+
+    @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
+    defaultValue = DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS + "")
+    private final long restoreJobLongRunningThresholdSeconds;
+
     protected RestoreJobConfigurationImpl()
     {
         this(builder());
@@ -64,6 +73,7 @@
         this.jobDiscoveryRecencyDays = builder.jobDiscoveryRecencyDays;
         this.processMaxConcurrency = builder.processMaxConcurrency;
         this.restoreJobTablesTtlSeconds = builder.restoreJobTablesTtlSeconds;
+        this.restoreJobLongRunningThresholdSeconds = builder.restoreJobLongRunningThresholdSeconds;
         validate();
     }
 
@@ -132,6 +142,14 @@
         return restoreJobTablesTtlSeconds;
     }
 
+    @Override
+    @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
+                  defaultValue = DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS + "")
+    public long restoreJobLongRunningHandlerThresholdSeconds()
+    {
+        return restoreJobLongRunningThresholdSeconds;
+    }
+
     public static Builder builder()
     {
         return new Builder();
@@ -142,6 +160,8 @@
      */
     public static class Builder implements DataObjectBuilder<Builder, RestoreJobConfigurationImpl>
     {
+        protected long restoreJobLongRunningThresholdSeconds =
+        DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS;
         private long jobDiscoveryActiveLoopDelayMillis = DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS;
         private long jobDiscoveryIdleLoopDelayMillis = DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS;
         private int jobDiscoveryRecencyDays = DEFAULT_JOB_DISCOVERY_RECENCY_DAYS;
@@ -218,6 +238,18 @@
             return update(b -> b.restoreJobTablesTtlSeconds = restoreJobTablesTtlSeconds);
         }
 
+        /**
+         * Sets the {@code restoreJobLongRunningThresholdSeconds} and returns a reference to this Builder enabling
+         * method chaining.
+         *
+         * @param restoreJobLongRunningThresholdSeconds the {@code restoreJobLongRunningThresholdSeconds} to set
+         * @return a reference to this Builder
+         */
+        public Builder restoreJobLongRunningThresholdSeconds(long restoreJobLongRunningThresholdSeconds)
+        {
+            return update(b -> b.restoreJobLongRunningThresholdSeconds = restoreJobLongRunningThresholdSeconds);
+        }
+
         @Override
         public RestoreJobConfigurationImpl build()
         {
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
index 7563e93..c4c7100 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
@@ -29,8 +29,6 @@
 import java.util.UUID;
 
 import com.datastax.driver.core.Row;
-import io.vertx.core.Handler;
-import io.vertx.core.Promise;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.DataObjectBuilder;
 import org.apache.cassandra.sidecar.common.data.CreateSliceRequestPayload;
@@ -40,6 +38,7 @@
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
 import org.apache.cassandra.sidecar.restore.RestoreJobUtil;
+import org.apache.cassandra.sidecar.restore.RestoreSliceHandler;
 import org.apache.cassandra.sidecar.restore.RestoreSliceTask;
 import org.apache.cassandra.sidecar.restore.RestoreSliceTracker;
 import org.apache.cassandra.sidecar.restore.StorageClient;
@@ -229,17 +228,17 @@
     /**
      * @return {@link RestoreSliceTask} of the restore slice. See {@link RestoreSliceTask} for the steps.
      */
-    public Handler<Promise<RestoreSlice>> toAsyncTask(StorageClientPool s3ClientPool,
-                                                      ExecutorPools.TaskExecutorPool executorPool,
-                                                      SSTableImporter importer,
-                                                      double requiredUsableSpacePercentage,
-                                                      RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
-                                                      RestoreJobStats stats,
-                                                      RestoreJobUtil restoreJobUtil)
+    public RestoreSliceHandler toAsyncTask(StorageClientPool s3ClientPool,
+                                           ExecutorPools.TaskExecutorPool executorPool,
+                                           SSTableImporter importer,
+                                           double requiredUsableSpacePercentage,
+                                           RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
+                                           RestoreJobStats stats,
+                                           RestoreJobUtil restoreJobUtil)
     {
         if (isCancelled)
-            return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
-                                                                                this, null));
+            return RestoreSliceTask.failed(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
+                                                                                 this, null), this);
 
         try
         {
@@ -254,13 +253,13 @@
         catch (IllegalStateException illegalState)
         {
             // The slice is not registered with a tracker, retry later.
-            return promise -> promise.tryFail(RestoreJobExceptions.ofSlice("Restore slice is not started",
-                                                                           this, illegalState));
+            return RestoreSliceTask.failed(RestoreJobExceptions.ofSlice("Restore slice is not started",
+                                                                           this, illegalState), this);
         }
         catch (Exception cause)
         {
-            return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is failed",
-                                                                                this, cause));
+            return RestoreSliceTask.failed(RestoreJobExceptions.ofFatalSlice("Restore slice is failed",
+                                                                                this, cause), this);
         }
     }
 
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
index 0602a9e..3c9e2f1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
@@ -48,6 +48,7 @@
         this.tableTtlSeconds = tableTtlSeconds;
     }
 
+    @Override
     protected void prepareStatements(@NotNull Session session)
     {
         insertSlice = prepare(insertSlice, session, CqlLiterals.insertSlice(keyspaceConfig));
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index ef36316..8651d53 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -59,7 +59,7 @@
     private static final int RESTORE_JOB_PREFIX_LEN = RESTORE_JOB_PREFIX.length();
     private static final int RESTORE_JOB_DEFAULT_HASH_SEED = 0;
 
-    private DigestAlgorithmProvider digestAlgorithmProvider;
+    private final DigestAlgorithmProvider digestAlgorithmProvider;
 
     @Inject
     public RestoreJobUtil(@Named("xxhash32") DigestAlgorithmProvider digestAlgorithmProvider)
@@ -223,4 +223,12 @@
             }
         }
     }
+
+    /**
+     * @return the current time in nanoseconds
+     */
+    public long currentTimeNanos()
+    {
+        return System.nanoTime();
+    }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index 1a438dc..edded10 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -21,8 +21,10 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -62,6 +64,8 @@
     private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
     private final RestoreJobStats stats;
     private final RestoreJobUtil restoreJobUtil;
+    private final Set<RestoreSliceHandler> activeTasks = ConcurrentHashMap.newKeySet();
+    private final long longRunningHandlerThresholdInSeconds;
 
     private volatile boolean isClosed = false; // OK to run close twice, so relax the control to volatile
 
@@ -82,6 +86,8 @@
                                                                         .processMaxConcurrency());
         this.requiredUsableSpacePercentage
         = config.serviceConfiguration().ssTableUploadConfiguration().minimumSpacePercentageRequired() / 100.0;
+        this.longRunningHandlerThresholdInSeconds = config.restoreJobConfiguration()
+                                                          .restoreJobLongRunningHandlerThresholdSeconds();
         this.importer = importer;
         this.sliceDatabaseAccessor = sliceDatabaseAccessor;
         this.stats = stats;
@@ -128,26 +134,28 @@
             if (slice == null) // it should never happen, and is only to make ide happy
             {
                 processMaxConcurrency.releasePermit();
-                return;
+                break;
             }
 
             // capture the new queue length after polling
             sliceQueue.captureImportQueueLength();
-            pool.executeBlocking(slice.toAsyncTask(s3ClientPool, pool, importer,
-                                                   requiredUsableSpacePercentage,
-                                                   sliceDatabaseAccessor, stats,
-                                                   restoreJobUtil),
-                                 false) // unordered
+            RestoreSliceHandler task = slice.toAsyncTask(s3ClientPool, pool, importer,
+                                                         requiredUsableSpacePercentage,
+                                                         sliceDatabaseAccessor, stats,
+                                                         restoreJobUtil);
+            activeTasks.add(task);
+            pool.executeBlocking(task, false) // unordered; run in parallel
             .onSuccess(restoreSlice -> {
+                int instanceId = slice.owner().id();
                 if (slice.hasImported())
                 {
-                    stats.captureSliceCompletionTime(slice.owner().id(), System.nanoTime() - slice.creationTimeNanos());
+                    stats.captureSliceCompletionTime(instanceId, System.nanoTime() - slice.creationTimeNanos());
                     LOGGER.info("Slice completes successfully. sliceKey={}", slice.key());
                     slice.complete();
                 }
                 else if (slice.hasStaged())
                 {
-                    // todo: report stat of time taken to stage
+                    stats.captureSliceStageTime(instanceId, task.elapsedInNanos());
                     LOGGER.info("Slice has been staged successfully. sliceKey={}", slice.key());
                     // the slice is not fully complete yet. Re-enqueue the slice.
                     sliceQueue.offer(slice);
@@ -186,12 +194,38 @@
                 // decrement the active slices and capture the new queue length
                 sliceQueue.decrementActiveSliceCount(slice);
                 sliceQueue.captureImportQueueLength();
+                activeTasks.remove(task);
             });
         }
         promise.tryComplete();
+        checkForLongRunningTasks();
         sliceQueue.capturePendingSliceCount();
     }
 
+    private void checkForLongRunningTasks()
+    {
+        for (RestoreSliceHandler task : activeTasks)
+        {
+            long elapsedInNanos = task.elapsedInNanos();
+            if (elapsedInNanos == -1)
+            {
+                continue;
+            }
+            long elapsedInSeconds = TimeUnit.SECONDS.convert(elapsedInNanos, TimeUnit.NANOSECONDS);
+            if (elapsedInSeconds > longRunningHandlerThresholdInSeconds)
+            {
+                LOGGER.warn("Long-running restore slice task detected. " +
+                            "elapsedSeconds={} thresholdSeconds={} sliceKey={} jobId={} status={}",
+                            elapsedInSeconds,
+                            longRunningHandlerThresholdInSeconds,
+                            task.slice().key(),
+                            task.slice().jobId(),
+                            task.slice().job().status);
+                stats.captureLongRunningRestoreHandler(task.slice().owner().id(), elapsedInNanos);
+            }
+        }
+    }
+
     @Override
     public void close()
     {
@@ -207,6 +241,12 @@
     }
 
     @VisibleForTesting
+    int activeTasks()
+    {
+        return activeTasks.size();
+    }
+
+    @VisibleForTesting
     int pendingStartSlices()
     {
         return sliceQueue.size();
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java
new file mode 100644
index 0000000..fdcf390
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.restore;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.db.RestoreSlice;
+
+/**
+ * A handler that processes a restore slice
+ */
+public interface RestoreSliceHandler extends Handler<Promise<RestoreSlice>>
+{
+    /**
+     * @return slice the handler processes
+     */
+    RestoreSlice slice();
+
+    /**
+     * @return the elapsed time in nanoseconds if the task has started processing, -1 otherwise
+     */
+    long elapsedInNanos();
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index 4e942c7..ed85d3f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -28,7 +28,6 @@
 import org.slf4j.LoggerFactory;
 
 import io.vertx.core.Future;
-import io.vertx.core.Handler;
 import io.vertx.core.Promise;
 import io.vertx.ext.web.handler.HttpException;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
@@ -59,7 +58,7 @@
  *
  * Note that the class is package private, and it is not intended to be referenced by other packages.
  */
-public class RestoreSliceTask implements Handler<Promise<RestoreSlice>>
+public class RestoreSliceTask implements RestoreSliceHandler
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(RestoreSliceTask.class);
 
@@ -71,6 +70,7 @@
     private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
     private final RestoreJobStats stats;
     private final RestoreJobUtil restoreJobUtil;
+    private long taskStartTimeNanos = -1;
 
     public RestoreSliceTask(RestoreSlice slice,
                             StorageClient s3Client,
@@ -94,9 +94,15 @@
         this.restoreJobUtil = restoreJobUtil;
     }
 
+    public static RestoreSliceHandler failed(RestoreJobException cause, RestoreSlice slice)
+    {
+        return new Failed(cause, slice);
+    }
+
     @Override
     public void handle(Promise<RestoreSlice> event)
     {
+        this.taskStartTimeNanos = restoreJobUtil.currentTimeNanos();
         if (failOnCancelled(event))
             return;
 
@@ -192,7 +198,7 @@
         .whenComplete((resp, cause) -> {
             if (cause == null)
             {
-                stats.captureSliceReplicationTime(System.nanoTime() - slice.creationTimeNanos());
+                stats.captureSliceReplicationTime(currentTimeInNanos() - slice.creationTimeNanos());
                 slice.setExistsOnS3();
                 return;
             }
@@ -244,6 +250,11 @@
         });
     }
 
+    private long currentTimeInNanos()
+    {
+        return restoreJobUtil.currentTimeNanos();
+    }
+
     private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event)
     {
         if (slice.isCancelled())
@@ -521,4 +532,53 @@
         LOGGER.warn("Committing slice failed with HttpException. slice={} statusCode={} exceptionPayload={}",
                     slice.sliceId(), httpException.getStatusCode(), httpException.getPayload(), httpException);
     }
+
+    @Override
+    public long elapsedInNanos()
+    {
+        return taskStartTimeNanos == -1 ? -1 :
+        currentTimeInNanos() - taskStartTimeNanos;
+    }
+
+    @Override
+    public RestoreSlice slice()
+    {
+        return slice;
+    }
+
+    /**
+     * A RestoreSliceHandler that immediately fails the slice/promise.
+     * Used when the processor already knows that a slice should not be processed for some reason
+     * as indicated in cause field.
+     */
+    public static class Failed implements RestoreSliceHandler
+    {
+        private final RestoreJobException cause;
+        private final RestoreSlice slice;
+
+        public Failed(RestoreJobException cause, RestoreSlice slice)
+        {
+            this.cause = cause;
+            this.slice = slice;
+        }
+
+        @Override
+        public void handle(Promise<RestoreSlice> promise)
+        {
+            promise.tryFail(cause);
+        }
+
+        @Override
+        public long elapsedInNanos()
+        {
+            // it fails immediately
+            return 0;
+        }
+
+        @Override
+        public RestoreSlice slice()
+        {
+            return slice;
+        }
+    }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java b/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
index 1a44bd8..27ca5fe 100644
--- a/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
+++ b/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
@@ -26,7 +26,7 @@
     /**
      * Captures the total time taken to complete a slice successfully
      *
-     * @param instanceId    instance that contains the slice
+     * @param instanceId    instance that is processing the slice
      * @param durationNanos duration in nanoseconds
      */
     default void captureSliceCompletionTime(int instanceId, long durationNanos)
@@ -35,6 +35,17 @@
     }
 
     /**
+     * Captures the total time taken to stage the slice
+     *
+     * @param instanceId    instance that contains the slice
+     * @param durationNanos duration in nanoseconds
+     */
+    default void captureSliceStageTime(int instanceId, long durationNanos)
+    {
+
+    }
+
+    /**
      * Captures the time taken to import SSTable(s) in a slice
      *
      * @param instanceId    instance that owns the slice
@@ -217,4 +228,14 @@
     {
 
     }
+
+    /**
+     * Captures a long-running restore job handler
+     * @param instanceId      instance that is processing the slice
+     * @param handlerDuration restore job current duration in nanoseconds
+     */
+    default void captureLongRunningRestoreHandler(int instanceId, long handlerDuration)
+    {
+
+    }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
index 4628a14..5b23031 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
@@ -55,6 +55,7 @@
                     .collect(Collectors.toList());
     }
 
+    @Override
     protected int getNumInstancesToManage(int clusterSize)
     {
         return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first 2 instances in the "cluster"
diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
index ef76ffa..d0c069d 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
@@ -118,6 +118,7 @@
          * @return instance meta information
          * @throws NoSuchElementException when the instance with {@code id} does not exist
          */
+        @Override
         public InstanceMetadata instanceFromId(int id) throws NoSuchElementException
         {
             return cassandraTestContext.instancesConfig().instanceFromId(id);
@@ -130,6 +131,7 @@
          * @return instance meta information
          * @throws NoSuchElementException when the instance for {@code host} does not exist
          */
+        @Override
         public InstanceMetadata instanceFromHost(String host) throws NoSuchElementException
         {
             return cassandraTestContext.instancesConfig().instanceFromHost(host);
diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
index 3bdeead..2dbf616 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
@@ -48,11 +48,13 @@
     private final HashedWheelTimer sharedHWT = new HashedWheelTimer(threadFactory);
     private final EventLoopGroup sharedEventLoopGroup = new NioEventLoopGroup(0, threadFactory);
 
+    @Override
     public EventLoopGroup eventLoopGroup(ThreadFactory threadFactory)
     {
         return sharedEventLoopGroup;
     }
 
+    @Override
     public void onClusterClose(EventLoopGroup eventLoopGroup)
     {
     }
@@ -63,6 +65,7 @@
         return sharedHWT;
     }
 
+    @Override
     public void onClusterClose(Timer timer)
     {
     }
diff --git a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
index d7cd14b..0e90ae8 100644
--- a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
+++ b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
@@ -100,6 +100,7 @@
     }
 
 
+    @Override
     public int compareTo(SimpleCassandraVersion other)
     {
         if (major < other.major)
diff --git a/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java b/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
index 641ba55..2939d45 100644
--- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
@@ -30,6 +30,7 @@
 @ExtendWith(VertxExtension.class)
 public class HealthServiceSslTest extends AbstractHealthServiceTest
 {
+    @Override
     public boolean isSslEnabled()
     {
         return true;
diff --git a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java b/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
index ef088db..ac4a942 100644
--- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
@@ -30,6 +30,7 @@
 @ExtendWith(VertxExtension.class)
 public class HealthServiceTest extends AbstractHealthServiceTest
 {
+    @Override
     public boolean isSslEnabled()
     {
         return false;
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 518fc70..68bb4d8 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -112,6 +112,7 @@
         RestoreJobConfigurationImpl.builder()
                                    .restoreJobTablesTtlSeconds(TimeUnit.DAYS.toSeconds(14) + 1)
                                    .processMaxConcurrency(RESTORE_MAX_CONCURRENCY)
+                                   .restoreJobLongRunningThresholdSeconds(1)
                                    .build();
         HealthCheckConfiguration healthCheckConfiguration = new HealthCheckConfigurationImpl(200, 1000);
         return SidecarConfigurationImpl.builder()
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
index c3fafc8..beea547 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
@@ -19,6 +19,9 @@
 package org.apache.cassandra.sidecar.restore;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.BeforeEach;
@@ -28,7 +31,10 @@
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.util.Modules;
+import io.vertx.core.Promise;
 import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 import org.apache.cassandra.sidecar.server.MainModule;
@@ -170,14 +176,74 @@
         });
     }
 
+    @Test
+    public void testLongRunningHandlerDetection()
+    {
+
+        when(sidecarSchema.isInitialized()).thenReturn(true);
+        periodicTaskExecutor.schedule(processor);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicLong currentTime = new AtomicLong(0);
+        RestoreSlice slice = mockSlowSlice(latch, currentTime::get); // Sets the start time
+        long fiveMinutesInNanos = TimeUnit.NANOSECONDS.convert(5, TimeUnit.MINUTES);
+        currentTime.set(fiveMinutesInNanos);
+        processor.submit(slice);
+        loopAssert(3, () -> {
+            assertThat(stats.longRunningRestoreHandlers.size()).isEqualTo(1);
+            Long handlerTimeInNanos = stats.longRunningRestoreHandlers.get(slice.owner().id());
+            assertThat(handlerTimeInNanos).isNotNull();
+            assertThat(handlerTimeInNanos).isEqualTo(fiveMinutesInNanos);
+            assertThat(processor.activeTasks()).isOne();
+        });
+
+        // Make slice completable.
+        latch.countDown();
+
+        // Make sure when the slice completes the active handler is removed
+        loopAssert(3, () -> {
+            assertThat(processor.activeTasks()).isZero();
+        });
+    }
+
     private RestoreSlice mockSlowSlice(CountDownLatch latch)
     {
+        return mockSlowSlice(latch, System::nanoTime);
+    }
+
+    private RestoreSlice mockSlowSlice(CountDownLatch latch, Supplier<Long> timeInNanosSupplier)
+    {
         RestoreSlice slice = mock(RestoreSlice.class, Mockito.RETURNS_DEEP_STUBS);
         when(slice.jobId()).thenReturn(UUIDs.timeBased());
         when(slice.owner().id()).thenReturn(1);
-        when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any())).thenReturn(promise -> {
-            Uninterruptibles.awaitUninterruptibly(latch);
-            promise.complete(slice);
+        when(slice.key()).thenReturn("SliceKey");
+        RestoreJob job = RestoreJob.builder()
+                                   .jobStatus(RestoreJobStatus.CREATED)
+                                   .build();
+        when(slice.job()).thenReturn(job);
+        when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any())).thenReturn(
+        new RestoreSliceHandler()
+        {
+            private Long startTime = timeInNanosSupplier.get();
+
+            @Override
+            public void handle(Promise<RestoreSlice> promise)
+            {
+                Uninterruptibles.awaitUninterruptibly(latch);
+                promise.complete(slice);
+            }
+
+            @Override
+            public long elapsedInNanos()
+            {
+                return timeInNanosSupplier.get() - startTime;
+            }
+
+            @Override
+            public RestoreSlice slice()
+            {
+                return slice;
+            }
         });
         when(slice.hasImported()).thenReturn(true);
         return slice;
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 6e9e1d7..dcfd6a1 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -25,6 +25,8 @@
 import java.nio.file.Paths;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -67,6 +69,7 @@
     private TaskExecutorPool executorPool;
     private TestRestoreJobStats stats;
     private TestRestoreSliceAccessor sliceDatabaseAccessor;
+    private RestoreJobUtil util;
 
     @BeforeEach
     void setup()
@@ -82,6 +85,7 @@
         mockSSTableImporter = mock(SSTableImporter.class);
         executorPool = new ExecutorPools(Vertx.vertx(), new ServiceConfigurationImpl()).internal();
         stats = new TestRestoreJobStats();
+        util = mock(RestoreJobUtil.class);
         sliceDatabaseAccessor = new TestRestoreSliceAccessor();
     }
 
@@ -302,14 +306,33 @@
         .hasMessageContaining("Random exception");
     }
 
+    @Test
+    void testSliceDuration()
+    {
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED, "QUORUM");
+        AtomicLong currentNanos = new AtomicLong(0);
+        RestoreSliceTask task = createTask(mockSlice, job, currentNanos::get);
+        Promise<RestoreSlice> promise = Promise.promise();
+        task.handle(promise); // Task isn't considered started until it `handle` is called
+        currentNanos.set(123L);
+        assertThat(task.elapsedInNanos()).isEqualTo(123L);
+    }
+
     private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job)
     {
+        return createTask(slice, job, System::nanoTime);
+    }
+
+    private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job, Supplier<Long> currentNanoTimeSupplier)
+    {
         when(slice.job()).thenReturn(job);
         assertThat(slice.job()).isSameAs(job);
         assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
         assertThat(slice.job().status).isEqualTo(job.status);
+        RestoreJobUtil util = mock(RestoreJobUtil.class);
+        when(util.currentTimeNanos()).thenAnswer(invok -> currentNanoTimeSupplier.get());
         return new TestRestoreSliceTask(slice, mockStorageClient, executorPool, mockSSTableImporter,
-                                        0, sliceDatabaseAccessor, stats);
+                                        0, sliceDatabaseAccessor, stats, util);
     }
 
     private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, RestoreJob job)
@@ -319,7 +342,7 @@
         assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
         assertThat(slice.job().status).isEqualTo(job.status);
         return new TestUnexpectedExceptionInRestoreSliceTask(slice, mockStorageClient, executorPool,
-                                                             mockSSTableImporter, 0, sliceDatabaseAccessor, stats);
+                                                             mockSSTableImporter, 0, sliceDatabaseAccessor, stats, util);
     }
 
     static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor
@@ -346,10 +369,11 @@
 
         public TestRestoreSliceTask(RestoreSlice slice, StorageClient s3Client, TaskExecutorPool executorPool,
                                     SSTableImporter importer, double requiredUsableSpacePercentage,
-                                    RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats)
+                                    RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats,
+                                    RestoreJobUtil restoreJobUtil)
         {
-            super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats,
-                  null);
+            super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage,
+                  sliceDatabaseAccessor, stats, restoreJobUtil);
             this.slice = slice;
             this.stats = stats;
         }
@@ -382,10 +406,10 @@
                                                          TaskExecutorPool executorPool, SSTableImporter importer,
                                                          double requiredUsableSpacePercentage,
                                                          RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
-                                                         RestoreJobStats stats)
+                                                         RestoreJobStats stats, RestoreJobUtil util)
         {
             super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats,
-                  null);
+                  util);
         }
 
         @Override
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
index 30bf04b..6cf72c1 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
@@ -240,6 +240,7 @@
             super(Vertx.vertx(), 1, null, null, null, null, null, "localhost", 9043);
         }
 
+        @Override
         protected JmxNotificationListener initializeJmxListener()
         {
             return null;
diff --git a/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java b/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
index 713c33c..0a2cd9e 100644
--- a/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
+++ b/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
@@ -19,7 +19,9 @@
 package org.apache.cassandra.sidecar.stats;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Test implementation for testing restore job related stats captured
@@ -39,6 +41,7 @@
     public long failedJobCount;
     public long activeJobCount;
     public long tokenRefreshCount;
+    public Map<Integer, Long> longRunningRestoreHandlers = new HashMap<>();
 
     @Override
     public void captureSliceCompletionTime(int instanceId, long durationNanos)
@@ -118,4 +121,10 @@
     {
         tokenRefreshCount += 1;
     }
+
+    @Override
+    public void captureLongRunningRestoreHandler(int instanceId, long handlerDuration)
+    {
+        longRunningRestoreHandlers.put(instanceId, handlerDuration);
+    }
 }