diff --git a/CHANGES.txt b/CHANGES.txt
index 43de823..d8813da 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Detect out of range data and cleanup using nodetool (CASSANDRASC-134)
  * Allow optional reason to abort restore jobs (CASSANDRASC-133)
  * Fix SidecarLoadBalancingPolicy unexpectedly removing local node and improve CI stability (CASSANDRASC-131)
  * Reduce implementations accessible from client (CASSANDRASC-127)
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
index 3a87e47..9903a0d 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
@@ -24,6 +24,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +33,7 @@
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
 import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.sidecar.common.server.data.Name;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.apache.cassandra.sidecar.common.server.exceptions.NodeBootstrappingException;
@@ -201,4 +203,14 @@
         }
         return dataFileLocations;
     }
+
+    @Override
+    public void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String table, int concurrency)
+    throws IOException, ExecutionException, InterruptedException
+    {
+        requireNonNull(keyspace, "keyspace must be non-null");
+        requireNonNull(table, "table must be non-null");
+        jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
+                 .forceKeyspaceCleanup(concurrency, keyspace, table);
+    }
 }
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
index d030514..8982c1b 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,6 +140,12 @@
         return delegate.getAllDataFileLocations();
     }
 
+    @Override
+    public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
+    {
+        return delegate.forceKeyspaceCleanup(jobs, keyspaceName, tables);
+    }
+
     /**
      * Ensures that gossip is running on the Cassandra instance
      *
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
index eba3f2f..d8f9412 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 /**
  * An interface that pulls methods from the Cassandra Storage Service Proxy
@@ -146,4 +147,16 @@
      * @return String array of all locations
      */
     String[] getAllDataFileLocations();
+
+    /**
+     * Force cleanup the data of the tables in the keyspace. All partitions that out of the range are removed
+     * @param jobs job concurrency
+     * @param keyspaceName keyspace of the table to clean
+     * @param tables tables to clean
+     * @return status code. 0: success; 1: aborted; 2: unable to cancel
+     * @throws IOException i/o exception during cleanup
+     * @throws ExecutionException it does not really throw but declared in MBean
+     * @throws InterruptedException it does not really throw but declared in MBean
+     */
+    int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException;
 }
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
index ae1e6ce..dd1d497 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
@@ -41,6 +41,7 @@
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse.ReplicaInfo;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.sidecar.common.server.data.Name;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.apache.cassandra.sidecar.common.server.utils.GossipInfoParser;
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java
index a46437c..fdef559 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java
@@ -36,6 +36,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.jetbrains.annotations.NotNull;
 
 
diff --git a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
index 91eb809..69915b5 100644
--- a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
+++ b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
@@ -33,6 +33,7 @@
 
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.sidecar.common.server.data.Name;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.assertj.core.api.InstanceOfAssertFactories;
diff --git a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java
index 2ebcbeb..3e96594 100644
--- a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java
+++ b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java
@@ -30,6 +30,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
index 1c00932..d293529 100644
--- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
+++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
@@ -18,9 +18,11 @@
 
 package org.apache.cassandra.sidecar.common.server;
 
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.sidecar.common.response.RingResponse;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
@@ -76,4 +78,27 @@
      * @return the list of all data file locations for the Cassandra instance
      */
     List<String> dataFileLocations();
+
+    /**
+     * Clean up the data of the specified and remove the keys no longer belongs to the Cassandra node.
+     *
+     * @param keyspace keyspace of the table to clean
+     * @param table table to clean
+     * @param concurrency concurrency of the cleanup (compaction) job.
+     *                    Note that it cannot exceed the configured `concurrent_compactors` in Cassandra
+     * @throws IOException i/o exception during cleanup
+     * @throws ExecutionException it does not really throw but declared in MBean
+     * @throws InterruptedException it does not really throw but declared in MBean
+     */
+    void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String table, int concurrency)
+    throws IOException, ExecutionException, InterruptedException;
+
+    /**
+     * Similar to {@link #outOfRangeDataCleanup(String, String, int)}, but use 1 for concurrency
+     */
+    default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String table)
+    throws IOException, ExecutionException, InterruptedException
+    {
+        outOfRangeDataCleanup(keyspace, table, 1);
+    }
 }
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Partitioner.java
similarity index 95%
rename from adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java
rename to server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Partitioner.java
index 2f4687b..3959593 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java
+++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Partitioner.java
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.adapters.base;
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
 
 import java.math.BigInteger;
 
diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Token.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Token.java
new file mode 100644
index 0000000..f8464ac
--- /dev/null
+++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Token.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
+
+import java.math.BigInteger;
+import java.util.Comparator;
+import java.util.Objects;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Token, i.e. hashed partition key, in Cassandra
+ */
+public final class Token implements Comparable<Token>
+{
+    private static final Comparator<Token> TOKEN_COMPARATOR = Comparator.comparing(Token::toBigInteger);
+
+    private final BigInteger value;
+
+    /**
+     * Create token from {@code BigInteger} value
+     * @param value token value
+     * @return token
+     */
+    public static Token from(BigInteger value)
+    {
+        return new Token(value);
+    }
+
+    /**
+     * Create token from its string literal
+     * @param valueStr token value
+     * @throws NumberFormatException {@code valueStr} is not a valid representation
+     *         of a BigInteger.
+     * @return token
+     */
+    public static Token from(String valueStr)
+    {
+        return new Token(new BigInteger(valueStr));
+    }
+
+    /**
+     * Create token from long value
+     * @param value token value
+     * @return token
+     */
+    public static Token from(long value)
+    {
+        return new Token(BigInteger.valueOf(value));
+    }
+
+    private Token(BigInteger value)
+    {
+        this.value = value;
+    }
+
+    /**
+     * @return the {@code BigInteger} value
+     */
+    public BigInteger toBigInteger()
+    {
+        return value;
+    }
+
+    /**
+     * @return a new instance of token whose value is {@code (this + 1)}
+     */
+    public Token increment()
+    {
+        return new Token(value.add(BigInteger.ONE));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+        Token token = (Token) o;
+        return Objects.equals(value, token.value);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return value.hashCode();
+    }
+
+    @Override
+    public int compareTo(@NotNull Token other)
+    {
+        return Objects.compare(this, Objects.requireNonNull(other), TOKEN_COMPARATOR);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Token(" + value + ')';
+    }
+}
diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java
new file mode 100644
index 0000000..5abc104
--- /dev/null
+++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Range;
+
+import com.datastax.driver.core.DataType;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Range: (start, end] - start exclusive and end inclusive
+ */
+public class TokenRange
+{
+    private final Range<Token> range;
+    private volatile Token firstToken = null;
+
+    /**
+     * Unwrap the java driver's token range if necessary and convert the unwrapped ranges list.
+     * Only the token ranges from Murmur3Partitioner and RandomPartitioner are supported.
+     *
+     * @param dsTokenRange TokenRange implementation in Cassandra java driver
+     * @return list of token ranges. If the input token range wraps around, the size of the list is 2;
+     * otherwise, the list has only one range
+     */
+    public static List<TokenRange> from(com.datastax.driver.core.TokenRange dsTokenRange)
+    {
+        DataType tokenDataType = dsTokenRange.getStart().getType();
+        if (tokenDataType == DataType.varint()) // BigInteger - RandomPartitioner
+        {
+            return dsTokenRange.unwrap()
+                               .stream()
+                               .map(range -> {
+                                   BigInteger start = (BigInteger) range.getStart().getValue();
+                                   BigInteger end = (BigInteger) range.getEnd().getValue();
+                                   if (end.compareTo(Partitioner.Random.minToken) == 0)
+                                   {
+                                       end = Partitioner.Random.maxToken;
+                                   }
+                                   return new TokenRange(start, end);
+                               })
+                               .collect(Collectors.toList());
+        }
+        else if (tokenDataType == DataType.bigint()) // Long - Murmur3Partitioner
+        {
+            return dsTokenRange.unwrap()
+                               .stream()
+                               .map(range -> {
+                                   BigInteger start = BigInteger.valueOf((Long) range.getStart().getValue());
+                                   BigInteger end = BigInteger.valueOf((Long) range.getEnd().getValue());
+                                   if (end.compareTo(Partitioner.Murmur3.minToken) == 0)
+                                   {
+                                       end = Partitioner.Murmur3.maxToken;
+                                   }
+                                   return new TokenRange(start, end);
+                               })
+                               .collect(Collectors.toList());
+        }
+        else
+        {
+            throw new IllegalArgumentException(
+            "Unsupported token type: " + tokenDataType +
+            ". Only tokens of Murmur3Partitioner and RandomPartitioner are supported.");
+        }
+    }
+
+    public TokenRange(long start, long end)
+    {
+        this(BigInteger.valueOf(start), BigInteger.valueOf(end));
+    }
+
+    public TokenRange(BigInteger start, BigInteger end)
+    {
+        this(Token.from(start), Token.from(end));
+    }
+
+    public TokenRange(Token start, Token end)
+    {
+        this.range = Range.openClosed(start, end);
+    }
+
+    /**
+     * @return start token. It is not enclosed in the range.
+     */
+    public Token start()
+    {
+        return range.lowerEndpoint();
+    }
+
+    /**
+     * @return end token. It is the last token enclosed in the range.
+     */
+    public Token end()
+    {
+        return range.upperEndpoint();
+    }
+
+    /**
+     * @return the first token enclosed in the range
+     */
+    @Nullable
+    public Token firstToken()
+    {
+        if (range.isEmpty())
+        {
+            return null;
+        }
+
+        // it is ok to race
+        if (firstToken == null)
+        {
+            firstToken = range.lowerEndpoint().increment();
+        }
+        return firstToken;
+    }
+
+    /**
+     * Test if this range encloses the other range.
+     * It simply delegates to {@link Range#encloses(Range)}
+     */
+    public boolean encloses(TokenRange other)
+    {
+        return this.range.encloses(other.range);
+    }
+
+    /**
+     * Two ranges are overlapping when their intersection is non-empty. For example,
+     *
+     * Ranges (0, 3] and (1, 4] are overlapping. The intersection is (1, 3]
+     * Ranges (0, 3] and (5, 7] are not overlapping, as there is no intersection
+     * Ranges (0, 3] and (3, 5] are not overlapping, as the intersection (3, 3] is empty
+     *
+     * Note that the semantics is different from {@link Range#isConnected(Range)}
+     *
+     * @return true if this range overlaps with the other range; otherwise, false
+     */
+    public boolean overlaps(TokenRange other)
+    {
+        return this.range.lowerEndpoint().compareTo(other.range.upperEndpoint()) < 0
+               && other.range.lowerEndpoint().compareTo(this.range.upperEndpoint()) < 0;
+    }
+
+    public TokenRange intersection(TokenRange overlapping)
+    {
+        Range<Token> overlap = this.range.intersection(overlapping.range);
+        return new TokenRange(overlap.lowerEndpoint(), overlap.upperEndpoint());
+    }
+
+    /**
+     * Determine whether all tokens in this range are larger than the ones in the other token range
+     * @param other token range
+     * @return true if the start token of this range is larger or equals to the other range's end token; otherwise, false
+     */
+    public boolean largerThan(TokenRange other)
+    {
+        return this.start().compareTo(other.end()) >= 0;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        TokenRange that = (TokenRange) o;
+        return Objects.equals(range, that.range);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return range.hashCode();
+    }
+}
diff --git a/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeTest.java b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeTest.java
new file mode 100644
index 0000000..103ff93
--- /dev/null
+++ b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TokenRangeTest
+{
+    @Test
+    void testEquals()
+    {
+        TokenRange r1 = new TokenRange(1, 100);
+        TokenRange r2 = new TokenRange(1, 100);
+        TokenRange r3 = new TokenRange(-10, 10);
+        assertThat(r1).isEqualTo(r2);
+        assertThat(r3).isNotEqualTo(r1)
+                      .isNotEqualTo(r2);
+    }
+
+    @Test
+    void testFirstToken()
+    {
+        TokenRange range = new TokenRange(1, 100);
+        assertThat(range.firstToken()).isEqualTo(Token.from(2));
+        // test the first token refer is the same
+        assertThat(range.firstToken()).isSameAs(range.firstToken());
+
+        TokenRange emptyRange = new TokenRange(1, 1);
+        assertThat(emptyRange.firstToken()).isNull();
+    }
+
+    @Test
+    void testCreateRangeWithInvalidParams()
+    {
+        assertThatThrownBy(() -> new TokenRange(1, -1))
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid range: (Token(1)‥Token(-1)]");
+    }
+
+    @Test
+    void testCreateFromJavaDriverTokenRange()
+    {
+        com.datastax.driver.core.TokenRange ordinaryRange = mockRange(1L, 100L);
+        when(ordinaryRange.isWrappedAround()).thenReturn(false);
+        when(ordinaryRange.unwrap()).thenCallRealMethod();
+        List<TokenRange> ranges = TokenRange.from(ordinaryRange);
+        assertThat(ranges).hasSize(1)
+                          .isEqualTo(Collections.singletonList(new TokenRange(1, 100)));
+    }
+
+    @Test
+    void testCreateFromWraparoundJavaDriverTokenRange()
+    {
+        com.datastax.driver.core.TokenRange range = mockRange(10L, -10L);
+        List<com.datastax.driver.core.TokenRange> unwrapped = Arrays.asList(mockRange(10L, Long.MAX_VALUE),
+                                                                            mockRange(Long.MIN_VALUE, -10L));
+        when(range.unwrap()).thenReturn(unwrapped);
+        List<TokenRange> ranges = TokenRange.from(range);
+        assertThat(ranges).hasSize(2)
+                          .isEqualTo(Arrays.asList(new TokenRange(10, Long.MAX_VALUE),
+                                                   new TokenRange(Long.MIN_VALUE, -10L)));
+    }
+
+    @Test
+    void testCreateFromWraparoundJavaDriverTokenRangeEndingInMinToken()
+    {
+        com.datastax.driver.core.TokenRange range = mockRange(10L, Long.MIN_VALUE);
+        // Java driver's token range considers the range is no a wraparound, if the end is the minimum token
+        when(range.unwrap()).thenReturn(Collections.singletonList(range));
+        List<TokenRange> ranges = TokenRange.from(range);
+        assertThat(ranges).hasSize(1)
+                          .isEqualTo(Collections.singletonList(new TokenRange(10L, Long.MAX_VALUE)));
+    }
+
+    @Test
+    void testRangeEnclose()
+    {
+        TokenRange r1 = new TokenRange(3, 5);
+        TokenRange r2 = new TokenRange(1, 10);
+        TokenRange r3 = new TokenRange(10, 11);
+        TokenRange r4 = new TokenRange(4, 11);
+        assertThat(r2.encloses(r1)).isTrue();
+        assertThat(r4.encloses(r3)).isTrue();
+        assertThat(r1.encloses(r2)).isFalse();
+        assertThat(r3.encloses(r1)).isFalse();
+        assertThat(r2.encloses(r3)).isFalse();
+        assertThat(r1.encloses(r4)).isFalse();
+        assertThat(r4.encloses(r1)).isFalse();
+    }
+
+    @Test
+    void testOverlaps()
+    {
+        TokenRange r1 = new TokenRange(3, 5);
+        TokenRange r2 = new TokenRange(1, 10);
+        TokenRange r3 = new TokenRange(10, 11);
+        TokenRange r4 = new TokenRange(4, 11);
+        assertThat(r1.overlaps(r2)).isTrue();
+        assertThat(r2.overlaps(r1)).isTrue();
+        assertThat(r3.overlaps(r4)).isTrue();
+        assertThat(r4.overlaps(r3)).isTrue();
+        assertThat(r2.overlaps(r4)).isTrue();
+        assertThat(r4.overlaps(r2)).isTrue();
+        assertThat(r2.overlaps(r3)).isFalse();
+        assertThat(r3.overlaps(r2)).isFalse();
+    }
+
+    @Test
+    void testLargerThan()
+    {
+        TokenRange r1 = new TokenRange(3, 5);
+        TokenRange r2 = new TokenRange(1, 10);
+        TokenRange r3 = new TokenRange(10, 11);
+        assertThat(r1.largerThan(r2)).isFalse();
+        assertThat(r2.largerThan(r1)).isFalse();
+        assertThat(r3.largerThan(r1)).isTrue();
+        assertThat(r1.largerThan(r3)).isFalse();
+        assertThat(r3.largerThan(r2)).isTrue();
+        assertThat(r2.largerThan(r3)).isFalse();
+    }
+
+    @Test
+    void testIntersection()
+    {
+        TokenRange r1 = new TokenRange(3, 5);
+        TokenRange r2 = new TokenRange(1, 10);
+        TokenRange r3 = new TokenRange(4, 6);
+        TokenRange r4 = new TokenRange(10, 11);
+        assertThat(r1.intersection(r2)).isEqualTo(r1);
+        assertThat(r2.intersection(r1)).isEqualTo(r1);
+        assertThat(r1.intersection(r3)).isEqualTo(new TokenRange(4, 5));
+        assertThat(r3.intersection(r1)).isEqualTo(new TokenRange(4, 5));
+        assertThat(r2.intersection(r4)).isEqualTo(new TokenRange(10, 10)); // empty range
+        assertThat(r2.intersection(r4)).isNotEqualTo(new TokenRange(5, 5)); // but not any empty range
+    }
+
+    private com.datastax.driver.core.TokenRange mockRange(long start, long end)
+    {
+        com.datastax.driver.core.TokenRange range = mock(com.datastax.driver.core.TokenRange.class);
+        com.datastax.driver.core.Token startToken = mockToken(start);
+        when(range.getStart()).thenReturn(startToken);
+        com.datastax.driver.core.Token endToken = mockToken(end);
+        when(range.getEnd()).thenReturn(endToken);
+        return range;
+    }
+
+    private com.datastax.driver.core.Token mockToken(long value)
+    {
+        com.datastax.driver.core.Token token = mock(com.datastax.driver.core.Token.class);
+        when(token.getType()).thenReturn(com.datastax.driver.core.DataType.bigint());
+        when(token.getValue()).thenReturn(value);
+        return token;
+    }
+}
diff --git a/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenTest.java b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenTest.java
new file mode 100644
index 0000000..a11010f
--- /dev/null
+++ b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
+
+import java.math.BigInteger;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class TokenTest
+{
+    @Test
+    void testCreateToken()
+    {
+        Token t1 = Token.from(1);
+        Token t2 = Token.from(BigInteger.ONE);
+        Token t3 = Token.from("1");
+        Token t4 = Token.from(1L);
+        assertThat(t1).isEqualTo(t2).isEqualTo(t3).isEqualTo(t4);
+    }
+
+    @Test
+    void testIncrement()
+    {
+        Token t1 = Token.from(1);
+        Token t2 = t1.increment();
+        assertThat(t2).isEqualTo(Token.from(2));
+    }
+
+    @Test
+    void testCompare()
+    {
+        Token t1 = Token.from(1);
+        Token t2 = Token.from(2);
+        Token t3 = Token.from(1);
+        assertThat(t1).isLessThan(t2)
+                      .isEqualByComparingTo(t3);
+        assertThat(t2).isGreaterThan(t1);
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
index 57b1bcb..8f8bc8d 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
@@ -37,6 +37,7 @@
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.restore.RestoreJobUtil;
 import org.apache.cassandra.sidecar.restore.RestoreSliceHandler;
@@ -207,6 +208,14 @@
         failAtInstance(owner().id());
     }
 
+    /**
+     * Request to clean up out of range data. It is requested when detecting the slice contains out of range data
+     */
+    public void requestOutOfRangeDataCleanup()
+    {
+        tracker.requestOutOfRangeDataCleanup();
+    }
+
     public void setExistsOnS3()
     {
         this.existsOnS3 = true;
@@ -234,6 +243,7 @@
                                            double requiredUsableSpacePercentage,
                                            RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
                                            RestoreJobUtil restoreJobUtil,
+                                           LocalTokenRangesProvider localTokenRangesProvider,
                                            SidecarMetrics metrics)
     {
         if (isCancelled)
@@ -248,6 +258,7 @@
                                         requiredUsableSpacePercentage,
                                         sliceDatabaseAccessor,
                                         restoreJobUtil,
+                                        localTokenRangesProvider,
                                         metrics);
         }
         catch (IllegalStateException illegalState)
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
index d0173bd..1e23e2b 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.sidecar.db;
 
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -29,6 +28,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.db.schema.RestoreSlicesSchema;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 
@@ -88,16 +88,15 @@
         return slice;
     }
 
-    public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, short bucketId,
-                                                              BigInteger startToken, BigInteger endToken)
+    public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, short bucketId, TokenRange range)
     {
         sidecarSchema.ensureInitialized();
 
         BoundStatement statement = restoreSlicesSchema.findAllByTokenRange()
                                                       .bind(jobId,
                                                             bucketId,
-                                                            startToken,
-                                                            endToken);
+                                                            range.start().toBigInteger(),
+                                                            range.end().toBigInteger());
         ResultSet result = execute(statement);
         List<RestoreSlice> slices = new ArrayList<>();
         for (Row row : result)
diff --git a/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
index 654544a..196ba7b 100644
--- a/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
+++ b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
@@ -47,6 +47,7 @@
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.jetbrains.annotations.NotNull;
 
@@ -82,7 +83,6 @@
     }
 
     @Override
-    @Nullable
     public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace)
     {
         List<InstanceMetadata> localInstances = instancesConfig.instances();
@@ -153,7 +153,6 @@
     /**
      * Reload the locally cached token ranges when needed
      */
-    @Nullable
     private synchronized Map<Integer, Set<TokenRange>> getCacheOrReload(Metadata metadata,
                                                                         String keyspace,
                                                                         Set<Integer> localInstanceIds,
@@ -167,7 +166,7 @@
             && localTokenRangesCache.containsKey(keyspace)
             && isClusterTheSame)
         {
-            return localTokenRangesCache.get(keyspace);
+            return localTokenRangesCache.getOrDefault(keyspace, Collections.emptyMap());
         }
 
         // otherwise, reload the token ranges
@@ -226,7 +225,7 @@
         {
             LOGGER.warn("Unable to determine local instances from client meta-data!");
         }
-        return localTokenRangesCache.get(keyspace);
+        return localTokenRangesCache.getOrDefault(keyspace, Collections.emptyMap());
     }
 
     private static class IpAddressAndPort
diff --git a/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
index cef04f6..e492013 100644
--- a/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
+++ b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
@@ -20,7 +20,8 @@
 
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
+
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 
 /**
  * Provides the token ranges of the local Cassandra instance(s)
@@ -30,12 +31,12 @@
     /**
      * Calculate the token ranges owned and replicated to the local Cassandra instance(s).
      * When Sidecar is paired with multiple Cassandra instance, the ranges of each Cassandra instance is captured
-     * in the form of map, where the key is the instance id and the value is the ranges of the instance. When Sidecar
-     * is paired with a single Cassandra instance, the result map has a single entry.
+     * in the form of map, where the key is the instance id and the value is the ranges of the Cassandra instance.
+     * When Cassandra is not running with VNode, the set of ranges has a single value.
+     * When Sidecar is paired with a single Cassandra instance, the result map has a single entry.
      *
      * @param keyspace keyspace to determine replication
-     * @return token ranges of the local Cassandra instances
+     * @return token ranges of the local Cassandra instances or an empty map of nothing is found
      */
-    @Nullable
     Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace);
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/locator/TokenRange.java b/src/main/java/org/apache/cassandra/sidecar/locator/TokenRange.java
deleted file mode 100644
index c0564db..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/locator/TokenRange.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.locator;
-
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import com.datastax.driver.core.DataType;
-
-/**
- * Range: (start, end] - start exclusive and end inclusive
- */
-public class TokenRange
-{
-    public final BigInteger start;
-    public final BigInteger end;
-
-    /**
-     * Unwrap the java driver's token range if necessary and convert the unwrapped ranges list.
-     * Only the token ranges from Murmur3Partitioner and RandomPartitioner are supported.
-     *
-     * @param dsTokenRange TokenRange implementation in Cassandra java driver
-     * @return list of token ranges. If the input token range wraps around, the size of the list is 2;
-     * otherwise, the list has only one range
-     */
-    public static List<TokenRange> from(com.datastax.driver.core.TokenRange dsTokenRange)
-    {
-        DataType tokenDataType = dsTokenRange.getStart().getType();
-        if (tokenDataType == DataType.varint()) // BigInteger - RandomPartitioner
-        {
-            return dsTokenRange.unwrap()
-                               .stream()
-                               .map(range -> new TokenRange((BigInteger) range.getStart().getValue(),
-                                                            (BigInteger) range.getEnd().getValue()))
-                               .collect(Collectors.toList());
-        }
-        else if (tokenDataType == DataType.bigint()) // Long - Murmur3Partitioner
-        {
-            return dsTokenRange.unwrap()
-                               .stream()
-                               .map(range -> new TokenRange((Long) range.getStart().getValue(),
-                                                            (Long) range.getEnd().getValue()))
-                               .collect(Collectors.toList());
-        }
-        else
-        {
-            throw new IllegalArgumentException(
-            "Unsupported token type: " + tokenDataType +
-            ". Only tokens of Murmur3Partitioner and RandomPartitioner are supported.");
-        }
-    }
-
-    public TokenRange(long start, long end)
-    {
-        this(BigInteger.valueOf(start), BigInteger.valueOf(end));
-    }
-
-    public TokenRange(BigInteger start, BigInteger end)
-    {
-        this.start = start;
-        this.end = end;
-    }
-
-    public BigInteger start()
-    {
-        return this.start;
-    }
-
-    public BigInteger end()
-    {
-        return this.end;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-        {
-            return true;
-        }
-
-        if (o == null || getClass() != o.getClass())
-        {
-            return false;
-        }
-
-        TokenRange that = (TokenRange) o;
-        return Objects.equals(start, that.start) && Objects.equals(end, that.end);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(start, end);
-    }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index c5d007a..46218ad 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -37,6 +37,7 @@
 import io.vertx.core.Promise;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.db.RestoreJob;
@@ -46,7 +47,6 @@
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
 import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges;
 import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
-import org.apache.cassandra.sidecar.locator.TokenRange;
 import org.apache.cassandra.sidecar.metrics.RestoreMetrics;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.tasks.PeriodicTask;
@@ -264,7 +264,7 @@
     {
         short bucketId = 0; // TODO: update the implementation to pick proper bucketId
         restoreSliceDatabaseAccessor
-        .selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId, range.start, range.end)
+        .selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId, range)
         .forEach(slice -> {
             // set the owner instance, which is not read from database
             slice = slice.unbuild().ownerInstance(instance).build();
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java
index cb68a8a..b47f9f0 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java
@@ -24,18 +24,17 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.vertx.core.CompositeFuture;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import io.vertx.core.Future;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
@@ -54,8 +53,10 @@
 public class RestoreJobManager
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(RestoreJobManager.class);
+    private static final Object PRESENT = new Object();
 
     private final Map<UUID, RestoreSliceTracker> jobs = new ConcurrentHashMap<>();
+    private final Cache<UUID, Object> deletedJobs;
     private final RestoreProcessor processor;
     private final ExecutorPools executorPools;
     private final InstanceMetadata instanceMetadata;
@@ -80,7 +81,8 @@
         this.instanceMetadata = instanceMetadata;
         this.executorPools = executorPools;
         this.processor = restoreProcessor;
-        // delete obsolete on start up. Once instance is started, the jobDiscoverer will find the jobs to cleanup
+        this.deletedJobs = Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
+        // delete obsolete on start up. Once instance is started, the jobDiscoverer will find the jobs to clean up
         if (deleteOnStart)
         {
             deleteObsoleteDataAsync();
@@ -99,7 +101,7 @@
     throws RestoreJobFatalException
     {
         RestoreSliceTracker tracker = jobs.computeIfAbsent(slice.jobId(),
-                                                           id -> new RestoreSliceTracker(restoreJob, processor));
+                                                           id -> new RestoreSliceTracker(restoreJob, processor, instanceMetadata));
         return tracker.trySubmit(slice);
     }
 
@@ -107,17 +109,17 @@
      * Update the restore job reference in tracker, in order for pending restore slices to read the latest
      * restore job, especially the credentials to download from cloud storage.
      *
-     * @param job restore job to update
+     * @param restoreJob restore job to update
      */
-    void updateRestoreJob(RestoreJob job)
+    void updateRestoreJob(RestoreJob restoreJob)
     {
-        RestoreSliceTracker tracker = jobs.computeIfAbsent(job.jobId,
-                                                           id -> new RestoreSliceTracker(job, processor));
-        tracker.updateRestoreJob(job);
+        RestoreSliceTracker tracker = jobs.computeIfAbsent(restoreJob.jobId,
+                                                           id -> new RestoreSliceTracker(restoreJob, processor, instanceMetadata));
+        tracker.updateRestoreJob(restoreJob);
     }
 
     /**
-     * Remove the tracker of the job when it is completed and delete its data on disk. The method internal.
+     * Remove the tracker of the job when it is completed and delete its data on disk. The method runs async and it for internal use only.
      * It should only be called by the background task, when it discovers the job is
      * in the final {@link org.apache.cassandra.sidecar.common.data.RestoreJobStatus}, i.e. SUCCEEDED or FAILED.
      *
@@ -125,46 +127,52 @@
      */
     void removeJobInternal(UUID jobId)
     {
-        RestoreSliceTracker tracker = jobs.remove(jobId);
-        if (tracker != null)
+        if (deletedJobs.getIfPresent(jobId) == PRESENT)
         {
-            tracker.cleanupInternal();
+            LOGGER.debug("The job is already removed. Skipping. jobId={}", jobId);
+            return;
         }
-        // There might be no tracker, but the job has data on disk.
-        deleteDataOfJobAsync(jobId);
+
+        executorPools
+        .internal()
+        .runBlocking(() -> {
+            RestoreSliceTracker tracker = jobs.remove(jobId);
+            if (tracker != null)
+            {
+                tracker.cleanupInternal();
+            }
+        })
+        .recover(cause -> {
+            // There might be no tracker, but the job has data on disk.
+            LOGGER.warn("Failed to clean up restore job. Recover and proceed to delete the on-disk files. jobId={}", jobId, cause);
+            return Future.succeededFuture();
+        })
+        .compose(v -> deleteDataOfJobAsync(jobId))
+        .onSuccess(v -> deletedJobs.put(jobId, PRESENT));
     }
 
     /**
      * Find obsolete job data on disk and delete them
      * The obsoleteness is determined by {@link RestoreJobConfiguration#jobDiscoveryRecencyDays}
      */
-    Future<Void> deleteObsoleteDataAsync()
+    void deleteObsoleteDataAsync()
     {
-        return findObsoleteJobDataDirs()
-               .compose(pathStream -> {
-                   try (Stream<Path> stream = pathStream)
-                   {
-                       // use 'join' to complete the other deletes, when there is an error
-                       List<Future> deletes = stream.map(this::deleteDataAsync)
-                                                    .collect(Collectors.toList());
-                       return CompositeFuture.join(deletes)
-                              .compose(compositeFuture -> {
-                                  // None of them should fail. Having the branch here for logic completeness
-                                  if (compositeFuture.failed())
-                                  {
-                                      LOGGER.warn("Unexpected error while deleting files.",
-                                                  compositeFuture.cause());
-                                  }
-                                  return Future.<Void>succeededFuture();
-                              });
-                   }
-               })
-               .recover(any -> Future.succeededFuture());
+        findObsoleteJobDataDirs()
+        .compose(pathStream -> executorPools
+                               .internal()
+                               .runBlocking(() -> {
+                                   try (Stream<Path> stream = pathStream)
+                                   {
+                                       stream.forEach(this::deleteRecursively);
+                                   }
+                               }))
+        .onFailure(cause -> LOGGER.warn("Unexpected error while deleting files.", cause));
     }
 
     /**
      * Find the restore job directories that are older than {@link RestoreJobConfiguration#jobDiscoveryRecencyDays}
-     * @return a future of stream of path that should be closed on success. When failed to list, no stream is created.
+     * Note that the returned Stream should be closed by the caller.
+     * @return a future of stream of path. When failed to list, return a failed failure.
      */
     Future<Stream<Path>> findObsoleteJobDataDirs()
     {
@@ -172,23 +180,9 @@
         if (!Files.exists(rootDir))
             return Future.succeededFuture(Stream.empty());
 
-        return executorPools.internal().executeBlocking(promise -> {
-            try
-            {
-                Stream<Path> obsoleteDirs = Files.walk(rootDir, 1)
-                                                 .filter(this::isObsoleteRestoreJobDir);
-                promise.complete(obsoleteDirs);
-            }
-            catch (IOException ioe)
-            {
-                LOGGER.warn("Error on listing restore job data directories.", ioe);
-            }
-            finally
-            {
-                // Ensure the promise is complete. It is a no-op, if the promise is already completed.
-                promise.tryComplete(Stream.empty());
-            }
-        });
+        return executorPools.internal()
+                            .executeBlocking(() -> Files.walk(rootDir, 1)
+                                                        .filter(this::isObsoleteRestoreJobDir));
     }
 
     // Deletes quietly w/o returning failed futures
@@ -204,7 +198,7 @@
             {
                 rootDirs
                 .filter(path -> Files.isDirectory(path) && path.startsWith(prefixedJobId))
-                .forEach(this::deleteDataAsync);
+                .forEach(this::deleteRecursively);
             }
             catch (IOException ioe) // thrown from Files.walk.
             {
@@ -213,21 +207,19 @@
         });
     }
 
-    // Deletes quietly w/o returning failed futures
-    private Future<Void> deleteDataAsync(Path root)
+    // Delete files from the root recursively and quietly w/o throwing any exception
+    private void deleteRecursively(Path root)
     {
-        return executorPools.internal().runBlocking(() -> {
-            try (Stream<Path> pathStream = Files.walk(root))
-            {
-                pathStream
-                .sorted(Comparator.reverseOrder())
-                .forEach(path -> ThrowableUtils.propagate(() -> Files.delete(path)));
-            }
-            catch (Exception exception)
-            {
-                LOGGER.warn("Error on deleting data. Path={}", root, exception);
-            }
-        });
+        try (Stream<Path> pathStream = Files.walk(root))
+        {
+            pathStream
+            .sorted(Comparator.reverseOrder())
+            .forEach(path -> ThrowableUtils.propagate(() -> Files.delete(path)));
+        }
+        catch (Exception exception)
+        {
+            LOGGER.warn("Error on deleting data. Path={}", root, exception);
+        }
     }
 
     // returns true only when all conditions are met
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index f765b9c..36a5d77 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -43,6 +43,8 @@
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
+import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.tasks.PeriodicTask;
 import org.apache.cassandra.sidecar.utils.SSTableImporter;
@@ -66,6 +68,7 @@
     private final RestoreJobUtil restoreJobUtil;
     private final Set<RestoreSliceHandler> activeTasks = ConcurrentHashMap.newKeySet();
     private final long longRunningHandlerThresholdInSeconds;
+    private final LocalTokenRangesProvider localTokenRangesProvider;
     private final SidecarMetrics metrics;
 
     private volatile boolean isClosed = false; // OK to run close twice, so relax the control to volatile
@@ -78,6 +81,7 @@
                             SSTableImporter importer,
                             RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
                             RestoreJobUtil restoreJobUtil,
+                            CachedLocalTokenRanges localTokenRangesProvider,
                             SidecarMetrics metrics)
     {
         this.pool = executorPools.internal();
@@ -92,6 +96,7 @@
         this.importer = importer;
         this.sliceDatabaseAccessor = sliceDatabaseAccessor;
         this.restoreJobUtil = restoreJobUtil;
+        this.localTokenRangesProvider = localTokenRangesProvider;
         this.metrics = metrics;
     }
 
@@ -144,6 +149,7 @@
                                                          requiredUsableSpacePercentage,
                                                          sliceDatabaseAccessor,
                                                          restoreJobUtil,
+                                                         localTokenRangesProvider,
                                                          metrics);
             activeTasks.add(task);
             pool.executeBlocking(task, false) // unordered; run in parallel
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index cddd950..697ad07 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -19,8 +19,13 @@
 package org.apache.cassandra.sidecar.restore;
 
 import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -32,6 +37,7 @@
 import io.vertx.ext.web.handler.HttpException;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.utils.Preconditions;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.db.RestoreJob;
@@ -41,6 +47,7 @@
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
 import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.metrics.RestoreMetrics;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.StopWatch;
@@ -72,6 +79,7 @@
     private final double requiredUsableSpacePercentage;
     private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
     private final RestoreJobUtil restoreJobUtil;
+    private final LocalTokenRangesProvider localTokenRangesProvider;
     private final RestoreMetrics metrics;
     private final InstanceMetrics instanceMetrics;
     private long taskStartTimeNanos = -1;
@@ -83,6 +91,7 @@
                             double requiredUsableSpacePercentage,
                             RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
                             RestoreJobUtil restoreJobUtil,
+                            LocalTokenRangesProvider localTokenRangesProvider,
                             SidecarMetrics metrics)
     {
         Preconditions.checkArgument(!slice.job().isManagedBySidecar()
@@ -95,6 +104,7 @@
         this.requiredUsableSpacePercentage = requiredUsableSpacePercentage;
         this.sliceDatabaseAccessor = sliceDatabaseAccessor;
         this.restoreJobUtil = restoreJobUtil;
+        this.localTokenRangesProvider = localTokenRangesProvider;
         this.metrics = metrics.server().restore();
         this.instanceMetrics = metrics.instance(slice.owner().id());
     }
@@ -105,6 +115,19 @@
     }
 
     @Override
+    public long elapsedInNanos()
+    {
+        return taskStartTimeNanos == -1 ? -1 :
+               currentTimeInNanos() - taskStartTimeNanos;
+    }
+
+    @Override
+    public RestoreSlice slice()
+    {
+        return slice;
+    }
+
+    @Override
     public void handle(Promise<RestoreSlice> event)
     {
         this.taskStartTimeNanos = restoreJobUtil.currentTimeNanos();
@@ -137,10 +160,10 @@
                     // 1. check object existence and validate eTag / checksum
                     return checkObjectExistence(event)
                            .compose(headObject -> downloadSlice(event))
-                           .<Void>compose(file -> {
+                           .compose(file -> {
                                slice.completeStagePhase();
                                sliceDatabaseAccessor.updateStatus(slice);
-                               return Future.succeededFuture();
+                               return Future.<Void>succeededFuture();
                            })
                            // completed staging. A new task is produced when it comes to import
                            .onSuccess(_v -> event.tryComplete(slice))
@@ -326,10 +349,11 @@
 
     private Future<File> unzip(File zipFile)
     {
-        Future<File> future = executorPool.executeBlocking(promise -> {
-            if (failOnCancelled(promise))
-                return;
+        if (slice.isCancelled())
+            return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
+                                                                         slice, null));
 
+        Future<File> future = executorPool.executeBlocking(() -> {
             // targetPathInStaging points to the directory named after uploadId
             // SSTableImporter expects the file system structure to be uploadId/keyspace/table/sstables
             File targetDir = slice.stageDirectory()
@@ -345,14 +369,13 @@
                 {
                     LOGGER.debug("The files in slice are already extracted. Maybe it is a retried task? " +
                                  "jobId={} sliceKey={}", slice.jobId(), slice.key());
-                    promise.complete(targetDir);
+                    // return early
+                    return targetDir;
                 }
                 else
                 {
-                    promise.tryFail(new RestoreJobException("Object not found from disk. File: " + zipFile));
+                    throw new RestoreJobException("Object not found from disk. File: " + zipFile);
                 }
-                // return early
-                return;
             }
 
             try
@@ -362,51 +385,49 @@
                 // The validation step later expects only the files registered in the manifest.
                 RestoreJobUtil.cleanDirectory(targetDir.toPath());
                 RestoreJobUtil.unzip(zipFile, targetDir);
-                // Notify the next step that unzip is complete
-                promise.complete(targetDir);
                 // Then, delete the downloaded zip file
                 if (!zipFile.delete())
                 {
                     LOGGER.warn("File deletion attempt failed. jobId={} sliceKey={} file={}",
                                 slice.jobId(), slice.key(), zipFile.getAbsolutePath());
                 }
+                // Notify the next step that unzip is complete
+                return targetDir;
             }
             catch (Exception cause)
             {
-                promise.tryFail(RestoreJobExceptions.propagate("Failed to unzip. File: " + zipFile, cause));
+                throw RestoreJobExceptions.propagate("Failed to unzip. File: " + zipFile, cause);
             }
         }, false); // unordered
 
         return StopWatch.measureTimeTaken(future, d -> instanceMetrics.restore().sliceUnzipTime.metric.update(d, TimeUnit.NANOSECONDS));
     }
 
-    // Validate integrity of the files from the zip. The failures from any step is fatal and not retryable.
+    // Validate integrity of the files from the zip. If the SSTables that are fully out of the owning range of the node is removed
     private Future<File> validateFiles(File directory)
     {
-        Future<File> future = executorPool.executeBlocking(promise -> {
-            if (failOnCancelled(promise))
-                return;
+        if (slice.isCancelled())
+            return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
+                                                                         slice, null));
 
-            Map<String, String> checksums;
-            try
+        Future<File> future = executorPool.executeBlocking(() -> {
+            File manifestFile = new File(directory, RestoreSliceManifest.MANIFEST_FILE_NAME);
+            RestoreSliceManifest manifest = RestoreSliceManifest.read(manifestFile);
+
+            if (manifest.isEmpty())
             {
-                File manifestFile = new File(directory, RestoreSliceManifest.MANIFEST_FILE_NAME);
-                RestoreSliceManifest manifest = RestoreSliceManifest.read(manifestFile);
-                checksums = manifest.mergeAllChecksums();
-            }
-            catch (RestoreJobFatalException e)
-            {
-                promise.tryFail(e);
-                return;
+                throw new RestoreJobFatalException("The downloaded slice has no data. " +
+                                                   "Directory: " + directory);
             }
 
-            if (checksums.isEmpty())
+            // validate the SSTable ranges with the owning range of the node and remove the out-of-range sstables
+            if (slice.job().isManagedBySidecar())
             {
-                promise.tryFail(new RestoreJobFatalException("The downloaded slice has no data. " +
-                                                             "Directory: " + directory));
-                return;
+                removeOutOfRangeSSTables(directory, manifest);
             }
 
+            Map<String, String> checksums = manifest.mergeAllChecksums();
+
             // exclude the manifest file
             File[] files = directory.listFiles((dir, name) -> !name.equals(RestoreSliceManifest.MANIFEST_FILE_NAME));
             if (files == null || files.length != checksums.size())
@@ -414,11 +435,10 @@
                 String msg = "Number of files does not match. Expected: " + checksums.size() +
                              "; Actual: " + (files == null ? 0 : files.length) +
                              "; Directory: " + directory;
-                promise.tryFail(new RestoreJobFatalException(msg));
-                return;
+                throw new RestoreJobFatalException(msg);
             }
 
-            compareChecksums(checksums, files, promise);
+            compareChecksums(checksums, files);
 
             // capture the data component size of sstables
             for (File file : files)
@@ -430,13 +450,71 @@
             }
 
             // all files match with the provided checksums
-            promise.tryComplete(directory);
+            return directory;
         }, false); // unordered
 
         return StopWatch.measureTimeTaken(future, d -> instanceMetrics.restore().sliceValidationTime.metric.update(d, TimeUnit.NANOSECONDS));
     }
 
-    private void compareChecksums(Map<String, String> expectedChecksums, File[] files, Promise<?> promise)
+    // Remove all the SSTables that does not belong this node
+    // The method modifies the input manifest and delete files under directory, if out of range sstables are found
+    private void removeOutOfRangeSSTables(File directory, RestoreSliceManifest manifest) throws RestoreJobException, IOException
+    {
+        Set<TokenRange> ranges = localTokenRangesProvider.localTokenRanges(slice.keyspace()).get(slice.owner().id());
+        if (ranges == null || ranges.isEmpty())
+        {
+            // Note: retry is allowed for the failure
+            throw new RestoreJobException("Unable to fetch local range, retry later");
+        }
+
+        // 1. remove the sstables that are fully out of range
+        // 2. detect if there is any range that partially overlaps. In that case, signal that this node is required to run nodetool cleanup on job completion
+        Iterator<Map.Entry<String, RestoreSliceManifest.ManifestEntry>> it = manifest.entrySet().iterator();
+        while (it.hasNext())
+        {
+            RestoreSliceManifest.ManifestEntry entry = it.next().getValue();
+            // TokenRange is open-closed, hence subtracting one from the rangeStart read from manifest
+            TokenRange sstableRange = new TokenRange(entry.startToken().subtract(BigInteger.ONE),
+                                                     entry.endToken());
+
+            boolean hasOverlap = false;
+            boolean fullyEnclosed = false;
+            for (TokenRange owningRange : ranges)
+            {
+                if (hasOverlap)
+                {
+                    break;
+                }
+
+                hasOverlap = owningRange.overlaps(sstableRange);
+
+                if (hasOverlap)
+                {
+                    fullyEnclosed = owningRange.encloses(sstableRange);
+                }
+            }
+
+            // fully out of range
+            if (!hasOverlap)
+            {
+                // remove the entry from manifest
+                it.remove();
+                // delete the files
+                for (String fileName : entry.componentsChecksum().keySet())
+                {
+                    Path path = directory.toPath().resolve(fileName);
+                    Files.deleteIfExists(path);
+                }
+            }
+            // overlaps, but is not fully enclosed; we need to run cleanup on this node
+            else if (!fullyEnclosed)
+            {
+                slice.requestOutOfRangeDataCleanup();
+            }
+        }
+    }
+
+    private void compareChecksums(Map<String, String> expectedChecksums, File[] files) throws RestoreJobFatalException
     {
         for (File file : files)
         {
@@ -444,8 +522,7 @@
             String expectedChecksum = expectedChecksums.get(name);
             if (expectedChecksum == null)
             {
-                promise.tryFail(new RestoreJobFatalException("File not found in manifest. File: " + name));
-                return;
+                throw new RestoreJobFatalException("File not found in manifest. File: " + name);
             }
 
             try
@@ -455,14 +532,12 @@
                 {
                     String msg = "Checksum does not match. Expected: " + expectedChecksum +
                                  "; actual: " + actualChecksum + "; file: " + file;
-                    promise.tryFail(new RestoreJobFatalException(msg));
-                    return;
+                    throw new RestoreJobFatalException(msg);
                 }
             }
-            catch (Exception cause)
+            catch (IOException cause)
             {
-                promise.tryFail(new RestoreJobFatalException("Failed to calculate checksum. File: " + file));
-                return;
+                throw new RestoreJobFatalException("Failed to calculate checksum. File: " + file, cause);
             }
         }
     }
@@ -520,17 +595,18 @@
                     httpException.getPayload(), httpException);
     }
 
-    @Override
-    public long elapsedInNanos()
+    // For testing only. Unsafe to call in production code.
+    @VisibleForTesting
+    void removeOutOfRangeSSTablesUnsafe(File directory, RestoreSliceManifest manifest) throws RestoreJobException, IOException
     {
-        return taskStartTimeNanos == -1 ? -1 :
-               currentTimeInNanos() - taskStartTimeNanos;
+        removeOutOfRangeSSTables(directory, manifest);
     }
 
-    @Override
-    public RestoreSlice slice()
+    // For testing only. Unsafe to call in production code.
+    @VisibleForTesting
+    void compareChecksumsUnsafe(Map<String, String> expectedChecksums, File[] files) throws RestoreJobFatalException
     {
-        return slice;
+        compareChecksums(expectedChecksums, files);
     }
 
     /**
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java
index 5905980..c3e042b 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java
@@ -26,6 +26,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
 import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
@@ -39,14 +42,17 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(RestoreSliceTracker.class);
 
     private volatile RestoreJob restoreJob;
+    private volatile boolean cleanupOutOfRangeRequested = false;
+    private final InstanceMetadata instanceMetadata;
     private final Map<RestoreSlice, Status> slices = new ConcurrentHashMap<>();
     private final RestoreProcessor processor;
     private final AtomicReference<RestoreJobFatalException> failureRef = new AtomicReference<>();
 
-    public RestoreSliceTracker(RestoreJob restoreJob, RestoreProcessor restoreProcessor)
+    public RestoreSliceTracker(RestoreJob restoreJob, RestoreProcessor restoreProcessor, InstanceMetadata instanceMetadata)
     {
         this.restoreJob = restoreJob;
         this.processor = restoreProcessor;
+        this.instanceMetadata = instanceMetadata;
     }
 
     /**
@@ -102,6 +108,11 @@
         cleanupInternal();
     }
 
+    public void requestOutOfRangeDataCleanup()
+    {
+        cleanupOutOfRangeRequested = true;
+    }
+
     /**
      * Internal method to clean up the {@link RestoreSlice}.
      * It validates the slices and log warnings if they are not in a final state,
@@ -109,8 +120,9 @@
      */
     void cleanupInternal()
     {
+        boolean succeeded = failureRef.get() == null;
         slices.forEach((slice, status) -> {
-            if (failureRef.get() == null && status != Status.COMPLETED)
+            if (succeeded && status != Status.COMPLETED)
             {
                 LOGGER.warn("Clean up pending restore slice when the job has not failed. jobId={}, sliceId={}",
                             restoreJob.jobId, slice.sliceId());
@@ -118,6 +130,34 @@
             slice.cancel();
         });
         slices.clear();
+
+        runOnCompletion();
+    }
+
+    /**
+     * Run operations on restore job completion, including success and failure cases
+     */
+    private void runOnCompletion()
+    {
+        if (cleanupOutOfRangeRequested)
+        {
+            CassandraAdapterDelegate delegate = instanceMetadata.delegate();
+            StorageOperations operations = delegate == null ? null : delegate.storageOperations();
+            if (operations == null)
+            {
+                LOGGER.warn("Out of range data cleanup for the restore job is requested. It failed to start the operation. jobId={}", restoreJob.jobId);
+                return;
+            }
+
+            try
+            {
+                operations.outOfRangeDataCleanup(restoreJob.keyspaceName, restoreJob.tableName);
+            }
+            catch (Throwable cause)
+            {
+                LOGGER.warn("Clean up out of range data has failed", cause);
+            }
+        }
     }
 
     /**
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index 2795b7b..fd31c7c 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -148,13 +148,6 @@
             return failedFuture;
         }
 
-        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
-        GetObjectRequest request =
-        GetObjectRequest.builder()
-                        .overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider()))
-                        .bucket(slice.bucket())
-                        .key(slice.key())
-                        .build();
         Path objectPath = slice.stagedObjectPath();
         File object = objectPath.toFile();
         if (object.exists())
@@ -169,12 +162,22 @@
             // For now, we just skip download, assuming the scenario is rare and no maliciousness
             return CompletableFuture.completedFuture(object);
         }
+
         if (!object.getParentFile().mkdirs())
         {
-            LOGGER.warn("Error occurred while creating directory. jobId={} s3_object={}",
+            LOGGER.warn("Error occurred while creating directory. jobId={} s3Object={}",
                         slice.jobId(), slice.stagedObjectPath());
+
         }
-        LOGGER.info("Downloading object. jobId={} s3_object={}", slice.jobId(), slice.stagedObjectPath());
+
+        LOGGER.info("Downloading object. jobId={} s3Object={}", slice.jobId(), slice.stagedObjectPath());
+        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+        GetObjectRequest request =
+        GetObjectRequest.builder()
+                        .overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider()))
+                        .bucket(slice.bucket())
+                        .key(slice.key())
+                        .build();
         return rateLimitedGetObject(slice, client, request, objectPath)
                .whenComplete(logCredentialOnRequestFailure(slice, credentials))
                .thenApply(res -> object);
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java
index 59f04e7..5325b2a 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
@@ -29,6 +30,8 @@
 import org.apache.cassandra.testing.CassandraTestContext;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Test to ensure connectivity with the JMX client
@@ -37,46 +40,77 @@
 {
     private static final String SS_OBJ_NAME = "org.apache.cassandra.db:type=StorageService";
 
+    /**
+     * Test jmx connectivity with various operations
+     */
     @CassandraIntegrationTest
     void testJmxConnectivity(CassandraTestContext context) throws IOException
     {
         try (JmxClient jmxClient = createJmxClient(context))
         {
-            String opMode = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
-                                     .getOperationMode();
-            assertThat(opMode).isNotNull();
-            assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", "DECOMMISSIONED", "CLIENT");
+            testGetOperationMode(jmxClient, context.cluster());
 
-            IUpgradeableInstance instance = context.cluster().getFirstRunningInstance();
-            IInstanceConfig config = instance.config();
-            assertThat(jmxClient.host()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress());
-            assertThat(jmxClient.port()).isEqualTo(config.jmxPort());
+            testGossipInfo(jmxClient);
+
+            testCorrectVersion(jmxClient, String.valueOf(context.version.major));
+
+            testTableCleanup(jmxClient, context.cluster());
         }
     }
 
-    @CassandraIntegrationTest
-    void testGossipInfo(CassandraTestContext context) throws IOException
+    private void testGetOperationMode(JmxClient jmxClient, UpgradeableCluster cluster)
     {
-        try (JmxClient jmxClient = createJmxClient(context))
-        {
-            FailureDetector proxy = jmxClient.proxy(FailureDetector.class,
-                                                    "org.apache.cassandra.net:type=FailureDetector");
-            String rawGossipInfo = proxy.getAllEndpointStates();
-            assertThat(rawGossipInfo).isNotEmpty();
-            Map<String, ?> gossipInfoMap = GossipInfoParser.parse(rawGossipInfo);
-            assertThat(gossipInfoMap).isNotEmpty();
-            gossipInfoMap.forEach((key, value) -> GossipInfoParser.isGossipInfoHostHeader(key));
-        }
+        String opMode = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                                 .getOperationMode();
+        assertThat(opMode).isNotNull();
+        assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", "DECOMMISSIONED", "CLIENT");
+
+        IUpgradeableInstance instance = cluster.getFirstRunningInstance();
+        IInstanceConfig config = instance.config();
+        assertThat(jmxClient.host()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress());
+        assertThat(jmxClient.port()).isEqualTo(config.jmxPort());
     }
 
-    @CassandraIntegrationTest
-    void testCorrectVersion(CassandraTestContext context) throws IOException
+    private void testGossipInfo(JmxClient jmxClient)
     {
-        try (JmxClient jmxClient = createJmxClient(context))
+        FailureDetector proxy = jmxClient.proxy(FailureDetector.class,
+                                                "org.apache.cassandra.net:type=FailureDetector");
+        String rawGossipInfo = proxy.getAllEndpointStates();
+        assertThat(rawGossipInfo).isNotEmpty();
+        Map<String, ?> gossipInfoMap = GossipInfoParser.parse(rawGossipInfo);
+        assertThat(gossipInfoMap).isNotEmpty();
+        gossipInfoMap.forEach((key, value) -> GossipInfoParser.isGossipInfoHostHeader(key));
+    }
+
+    private void testCorrectVersion(JmxClient jmxClient, String majorVersion)
+    {
+        String releaseVersion = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                                         .getReleaseVersion();
+        assertThat(releaseVersion).startsWith(majorVersion);
+    }
+
+    // a test to ensure the jmx client can invoke the MBean method
+    private void testTableCleanup(JmxClient jmxClient, UpgradeableCluster cluster)
+    {
+        cluster.schemaChange("CREATE KEYSPACE jmx_client_test WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}");
+        cluster.schemaChange("CREATE TABLE jmx_client_test.table_cleanup ( a int PRIMARY KEY, b int)");
+        cluster.get(1).executeInternal("INSERT INTO jmx_client_test.table_cleanup (a, b) VALUES (1, 1)");
+        cluster.get(1).flush("jmx_client_test");
+        int status = -1;
+        try
         {
-            jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
-                     .refreshSizeEstimates();
+            status = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                              .forceKeyspaceCleanup(1, "jmx_client_test", "table_cleanup");
         }
+        catch (Exception e)
+        {
+            fail("Unexpected exception ", e);
+        }
+        assertThat(status).isZero();
+
+        assertThatThrownBy(() -> jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                                          .forceKeyspaceCleanup(1, "jmx_client_test", "table_not_exist"))
+        .hasMessageContaining("Unknown keyspace/cf pair");
     }
 
     /**
@@ -89,6 +123,8 @@
         void refreshSizeEstimates();
 
         String getReleaseVersion();
+
+        int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables);
     }
 
     /**
@@ -99,7 +135,6 @@
         String getAllEndpointStates();
     }
 
-
     private static JmxClient createJmxClient(CassandraTestContext context)
     {
         IUpgradeableInstance instance = context.cluster().getFirstRunningInstance();
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
index d08131d..604532b 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
@@ -40,8 +40,8 @@
 import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.TokenSupplier;
-import org.apache.cassandra.sidecar.adapters.base.Partitioner;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.AbstractCassandraTestContext;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java
index aff80a5..deb84af 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java
@@ -42,8 +42,8 @@
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
-import org.apache.cassandra.sidecar.adapters.base.Partitioner;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 
 import static org.assertj.core.api.Assertions.assertThat;
diff --git a/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java b/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java
index d3f065b..6085525 100644
--- a/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.sidecar.concurrent;
 
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -35,6 +33,7 @@
 import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
 import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -44,7 +43,7 @@
 /**
  * Test {@link ExecutorPools}
  */
-public class ExecutorPoolsTest
+class ExecutorPoolsTest
 {
     private ExecutorPools pools;
     private SidecarMetrics metrics;
@@ -69,7 +68,7 @@
     }
 
     @Test
-    public void testClosingExecutorPoolShouldThrow()
+    void testClosingExecutorPoolShouldThrow()
     {
         assertThatThrownBy(() -> pools.service().close())
         .hasMessage("Closing TaskExecutorPool is not supported!")
@@ -81,9 +80,37 @@
     }
 
     @Test
-    public void testOrdered()
+    void testExecutionOrder()
     {
-        // not thread-safe
+        testExecutionOrder(true, true);
+        testExecutionOrder(false, true);
+        testExecutionOrder(true, false);
+        testExecutionOrder(false, false);
+    }
+
+    @Test
+    void testMetricCapture()
+    {
+        TaskExecutorPool pool = pools.internal();
+        int total = 100;
+        CountDownLatch stop = new CountDownLatch(total);
+        for (int i = 0; i < total; i++)
+        {
+            pool.runBlocking(() -> stop.countDown());
+        }
+
+        assertThat(Uninterruptibles.awaitUninterruptibly(stop, 10, TimeUnit.SECONDS))
+        .describedAs("Test should finish in 10 seconds")
+        .isTrue();
+
+        // there could be some delay to read the metric that reflects the last task. If so, retry the assertion for at most 2 seconds
+        loopAssert(2,
+                   () -> assertThat(metrics.server().resource().internalTaskTime.metric.getCount()).isEqualTo(total));
+    }
+
+    private void testExecutionOrder(boolean orderedSubmission, boolean orderedExecution)
+    {
+        // not thread-safe deliberated
         class IntWrapper
         {
             int i = 0;
@@ -97,25 +124,33 @@
         TaskExecutorPool pool = pools.internal();
         IntWrapper v = new IntWrapper();
         int total = 100;
+        CountDownLatch ready = new CountDownLatch(1);
         CountDownLatch stop = new CountDownLatch(total);
-        Set<String> threadNames = new HashSet<>();
         for (int i = 0; i < total; i++)
         {
-            // Start 100 parallel executions that each submits the ordered execution
-            pool.executeBlocking(promise -> {
-                pool.executeBlocking(p -> {
+            // Start 100 executions that each submits the ordered execution
+            pool.runBlocking(() -> {
+                Uninterruptibles.awaitUninterruptibly(ready);
+                pool.runBlocking(() -> {
                     v.increment();
-                    threadNames.add(Thread.currentThread().getName());
                     stop.countDown();
-                    assertThat(metrics.server().resource().internalTaskTime.metric.getCount()).isEqualTo(200);
-                }, true);
-            }, false);
+                }, orderedExecution);
+            }, orderedSubmission);
         }
+        ready.countDown();
 
         assertThat(Uninterruptibles.awaitUninterruptibly(stop, 10, TimeUnit.SECONDS))
         .describedAs("Test should finish in 10 seconds")
         .isTrue();
+
         // Although IntWrapper is not thread safe, the serial execution (ordered) prevents any race condition.
-        assertThat(v.i).isEqualTo(total);
+        if (orderedExecution)
+        {
+            assertThat(v.i).isEqualTo(total);
+        }
+        else // if execution is unordered, the output is likely less than total due to race
+        {
+            assertThat(v.i).isLessThanOrEqualTo(total);
+        }
     }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/locator/TokenRangeTest.java b/src/test/java/org/apache/cassandra/sidecar/locator/TokenRangeTest.java
deleted file mode 100644
index 4671f18..0000000
--- a/src/test/java/org/apache/cassandra/sidecar/locator/TokenRangeTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.locator;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.Token;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-class TokenRangeTest
-{
-    @Test
-    void testEquals()
-    {
-        TokenRange r1 = new TokenRange(1, 100);
-        TokenRange r2 = new TokenRange(1, 100);
-        TokenRange r3 = new TokenRange(-10, 10);
-        assertThat(r1).isEqualTo(r2);
-        assertThat(r1).isEqualTo(r1);
-        assertThat(r1).isNotEqualTo(r3);
-        assertThat(r3).isNotEqualTo(r1);
-    }
-
-    @Test
-    void testCreateFromJavaDriverTokenRange()
-    {
-        com.datastax.driver.core.TokenRange ordinaryRange = mockRange(1L, 100L);
-        when(ordinaryRange.isWrappedAround()).thenReturn(false);
-        when(ordinaryRange.unwrap()).thenCallRealMethod();
-        List<TokenRange> ranges = TokenRange.from(ordinaryRange);
-        assertThat(ranges).hasSize(1)
-                          .isEqualTo(Collections.singletonList(new TokenRange(1, 100)));
-    }
-
-    @Test
-    void testCreateFromWraparoundJavaDriverTokenRange()
-    {
-        com.datastax.driver.core.TokenRange range = mockRange(10L, -10L);
-        List<com.datastax.driver.core.TokenRange> unwrapped = Arrays.asList(mockRange(10L, Long.MIN_VALUE),
-                                                                            mockRange(Long.MIN_VALUE, -10L));
-        when(range.unwrap()).thenReturn(unwrapped);
-        List<TokenRange> ranges = TokenRange.from(range);
-        assertThat(ranges).hasSize(2)
-                          .isEqualTo(Arrays.asList(new TokenRange(10, Long.MIN_VALUE),
-                                                   new TokenRange(Long.MIN_VALUE, -10L)));
-    }
-
-    private com.datastax.driver.core.TokenRange mockRange(long start, long end)
-    {
-        com.datastax.driver.core.TokenRange range = mock(com.datastax.driver.core.TokenRange.class);
-        Token startToken = mockToken(start);
-        when(range.getStart()).thenReturn(startToken);
-        Token endToken = mockToken(end);
-        when(range.getEnd()).thenReturn(endToken);
-        return range;
-    }
-
-    private Token mockToken(long value)
-    {
-        Token token = mock(Token.class);
-        when(token.getType()).thenReturn(DataType.bigint());
-        when(token.getValue()).thenReturn(value);
-        return token;
-    }
-}
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
index 51ad81e..89729a3 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
@@ -148,7 +148,8 @@
 
         manager.removeJobInternal(slice.jobId()); // it cancels the non-completed slices
 
-        assertThat(slice.isCancelled()).isTrue();
+        // removeJobInternal runs async. Wait for at most 2 seconds for the slice to be cancelled
+        loopAssert(2, () -> assertThat(slice.isCancelled()).isTrue());
     }
 
     @Test
@@ -252,6 +253,7 @@
         RestoreSlice slice = RestoreSlice
                              .builder()
                              .jobId(job.jobId)
+                             .sliceId("testSliceId")
                              .bucketId((short) 0)
                              .stageDirectory(testDir, "uploadId")
                              .storageKey("storageKey")
@@ -260,7 +262,7 @@
                              .replicaStatus(Collections.emptyMap())
                              .replicas(Collections.emptySet())
                              .build();
-        RestoreSliceTracker tracker = new RestoreSliceTracker(job, mock(RestoreProcessor.class));
+        RestoreSliceTracker tracker = new RestoreSliceTracker(job, mock(RestoreProcessor.class), owner);
         slice.registerTracker(tracker);
         return slice;
     }
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
index d5812dd..e878fa2 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
@@ -215,7 +215,7 @@
                                    .jobStatus(RestoreJobStatus.CREATED)
                                    .build();
         when(slice.job()).thenReturn(job);
-        when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any())).thenReturn(
+        when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any(), any())).thenReturn(
         new RestoreSliceHandler()
         {
             private Long startTime = timeInNanosSupplier.get();
diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 0c7dfc9..0d2a0c4 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -20,14 +20,22 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
+import java.util.stream.IntStream;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -40,6 +48,7 @@
 import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
@@ -49,14 +58,18 @@
 import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
+import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceRestoreMetrics;
+import org.apache.cassandra.sidecar.restore.RestoreSliceManifest.ManifestEntry;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.SSTableImporter;
+import org.apache.cassandra.sidecar.utils.XXHash32Provider;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 
@@ -64,7 +77,9 @@
 import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -77,6 +92,7 @@
     private TaskExecutorPool executorPool;
     private SidecarMetrics metrics;
     private TestRestoreSliceAccessor sliceDatabaseAccessor;
+    private LocalTokenRangesProvider localTokenRangesProvider;
     private RestoreJobUtil util;
 
     @BeforeEach
@@ -99,7 +115,8 @@
         when(mockRegistryFactory.getOrCreate()).thenReturn(registry());
         when(mockRegistryFactory.getOrCreate(1)).thenReturn(registry(1));
         metrics = new SidecarMetricsImpl(mockRegistryFactory, mockInstanceMetadataFetcher);
-        util = mock(RestoreJobUtil.class);
+        localTokenRangesProvider = mock(LocalTokenRangesProvider.class);
+        util = new RestoreJobUtil(new XXHash32Provider());
         sliceDatabaseAccessor = new TestRestoreSliceAccessor();
     }
 
@@ -340,6 +357,95 @@
         assertThat(task.elapsedInNanos()).isEqualTo(123L);
     }
 
+    @Test
+    void testRemoveOutOfRangeSSTables(@TempDir Path tempDir) throws RestoreJobException, IOException
+    {
+        // TODO: update test to use replica ranges implementation
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED, "QUORUM");
+        RestoreSliceTask task = createTask(mockSlice, job);
+
+        // the mocked localTokenRangesProvider returns null, so retry later
+        RestoreSliceManifest manifest = new RestoreSliceManifest();
+        assertThatThrownBy(() -> task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest))
+        .isExactlyInstanceOf(RestoreJobException.class)
+        .hasMessageContaining("Unable to fetch local range, retry later");
+
+        // enclosed in the node's owning range: [1, 10] is fully enclosed in (0, 100]
+        // it should not remove the manifest entry, and no cleanup is needed
+        Map<Integer, Set<TokenRange>> localRanges = new HashMap<>(1);
+        Set<TokenRange> nodeRanges = new HashSet<>();
+        nodeRanges.add(new TokenRange(0, 100)); // not using vnode, so a single range
+        localRanges.put(1, nodeRanges); // instance id is 1. See setup()
+        when(localTokenRangesProvider.localTokenRanges(any())).thenReturn(localRanges);
+        ManifestEntry rangeEnclosed = new ManifestEntry(Collections.emptyMap(),
+                                                        BigInteger.valueOf(1), // start
+                                                        BigInteger.valueOf(10)); // end
+        manifest.put("foo-", rangeEnclosed);
+        task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest);
+        assertThat(manifest).hasSize(1);
+        verify(mockSlice, times(0)).requestOutOfRangeDataCleanup();
+
+        // fully out of range: [-10, 0] is fully out of range of (0, 100]
+        // it should remove the manifest entry entirely; no clean up required
+        manifest.clear();
+        ManifestEntry outOfRange = new ManifestEntry(Collections.emptyMap(),
+                                                     BigInteger.valueOf(-10), // start
+                                                     BigInteger.valueOf(0)); // end
+        manifest.put("foo-", outOfRange);
+        task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest);
+        assertThat(manifest).isEmpty();
+        verify(mockSlice, times(0)).requestOutOfRangeDataCleanup();
+
+        // partially out of range: [-10, 10] is partially out of range of (0, 100]
+        // it should not remove the manifest entry, but it should signal to request out of range data cleanup
+        manifest.clear();
+        ManifestEntry partiallyOutOfRange = new ManifestEntry(Collections.emptyMap(),
+                                                              BigInteger.valueOf(-10), // start
+                                                              BigInteger.valueOf(10)); // end
+        manifest.put("foo-", partiallyOutOfRange);
+        task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest);
+        assertThat(manifest).hasSize(1);
+        verify(mockSlice, times(1)).requestOutOfRangeDataCleanup();
+    }
+
+    @Test
+    void testCompareChecksum(@TempDir Path tempDir) throws RestoreJobFatalException, IOException
+    {
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED);
+        RestoreSliceTask task = createTask(mockSlice, job);
+
+        byte[] bytes = "Hello".getBytes(StandardCharsets.UTF_8);
+        File[] testFiles = IntStream.range(0, 10).mapToObj(i -> new File(tempDir.toFile(), "f" + i))
+                                    .map(f -> ThrowableUtils.propagate(() -> Files.write(f.toPath(), bytes)).toFile())
+                                    .toArray(File[]::new);
+        Map<String, String> expectedChecksums = new HashMap<>(10);
+        for (File f : testFiles)
+        {
+            expectedChecksums.put(f.getName(), util.checksum(f));
+        }
+
+        assertThat(expectedChecksums)
+        .hasSize(10)
+        .containsEntry("f0", "f206d28f"); // hash value for "Hello"
+
+        // it should not throw
+        task.compareChecksumsUnsafe(expectedChecksums, testFiles);
+
+        // test check with file that does not exist
+        Map<String, String> nonexistFileChecksums = new HashMap<>(10);
+        nonexistFileChecksums.put("non-exist-file", "hash");
+        assertThatThrownBy(() -> task.compareChecksumsUnsafe(nonexistFileChecksums, testFiles))
+        .isInstanceOf(RestoreJobFatalException.class)
+        .hasMessageContaining("File not found in manifest");
+
+        // test check with invalid checksum value
+        Map<String, String> invalidChecksums = new HashMap<>(expectedChecksums);
+        invalidChecksums.put("f0", "invalid_hash"); // modify the hash of the file
+        assertThatThrownBy(() -> task.compareChecksumsUnsafe(invalidChecksums, testFiles))
+        .isInstanceOf(RestoreJobFatalException.class)
+        .hasMessageContaining("Checksum does not match. Expected: invalid_hash; actual: f206d28f");
+    }
+
     private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job)
     {
         return createTask(slice, job, System::nanoTime);
@@ -351,10 +457,10 @@
         assertThat(slice.job()).isSameAs(job);
         assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
         assertThat(slice.job().status).isEqualTo(job.status);
-        RestoreJobUtil util = mock(RestoreJobUtil.class);
-        when(util.currentTimeNanos()).thenAnswer(invok -> currentNanoTimeSupplier.get());
+        RestoreJobUtil spiedUtil = spy(util);
+        when(spiedUtil.currentTimeNanos()).thenAnswer(invok -> currentNanoTimeSupplier.get());
         return new TestRestoreSliceTask(slice, mockStorageClient, executorPool, mockSSTableImporter,
-                                        0, sliceDatabaseAccessor, util, metrics);
+                                        0, sliceDatabaseAccessor, spiedUtil, localTokenRangesProvider, metrics);
     }
 
     private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, RestoreJob job)
@@ -365,7 +471,7 @@
         assertThat(slice.job().status).isEqualTo(job.status);
         return new TestUnexpectedExceptionInRestoreSliceTask(slice, mockStorageClient, executorPool,
                                                              mockSSTableImporter, 0, sliceDatabaseAccessor,
-                                                             util, metrics);
+                                                             util, localTokenRangesProvider, metrics);
     }
 
     static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor
@@ -393,10 +499,11 @@
         public TestRestoreSliceTask(RestoreSlice slice, StorageClient s3Client, TaskExecutorPool executorPool,
                                     SSTableImporter importer, double requiredUsableSpacePercentage,
                                     RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobUtil restoreJobUtil,
+                                    LocalTokenRangesProvider localTokenRangesProvider,
                                     SidecarMetrics metrics)
         {
             super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage,
-                  sliceDatabaseAccessor, restoreJobUtil, metrics);
+                  sliceDatabaseAccessor, restoreJobUtil, localTokenRangesProvider, metrics);
             this.slice = slice;
             this.instanceMetrics = metrics.instance(slice.owner().id());
         }
@@ -429,10 +536,12 @@
                                                          TaskExecutorPool executorPool, SSTableImporter importer,
                                                          double requiredUsableSpacePercentage,
                                                          RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
-                                                         RestoreJobUtil util, SidecarMetrics metrics)
+                                                         RestoreJobUtil util,
+                                                         LocalTokenRangesProvider localTokenRangesProvider,
+                                                         SidecarMetrics metrics)
         {
             super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage,
-                  sliceDatabaseAccessor, util, metrics);
+                  sliceDatabaseAccessor, util, localTokenRangesProvider, metrics);
         }
 
         @Override
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
index 3eb94a0..5a78fc2 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
@@ -18,14 +18,12 @@
 
 package org.apache.cassandra.sidecar.routes.restore;
 
-import java.math.BigInteger;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
-import com.google.common.collect.Range;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -45,6 +43,7 @@
 import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
 import org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
@@ -153,7 +152,7 @@
         testRestoreSlices.updateStatusFunc = func;
     }
 
-    protected void mockLookupRestoreSlices(BiFunction<UUID, Range<BigInteger>, List<RestoreSlice>> func)
+    protected void mockLookupRestoreSlices(BiFunction<UUID, TokenRange, List<RestoreSlice>> func)
     {
         testRestoreSlices.selectByJobByRangeFunc = func;
     }
@@ -206,7 +205,7 @@
         {
             Function<RestoreSlice, RestoreSlice> createFunc;
             Function<RestoreSlice, RestoreSlice> updateStatusFunc;
-            BiFunction<UUID, Range<BigInteger>, List<RestoreSlice>> selectByJobByRangeFunc;
+            BiFunction<UUID, TokenRange, List<RestoreSlice>> selectByJobByRangeFunc;
 
             TestRestoreSliceDatabaseAccessor(SidecarSchema sidecarSchema)
             {
@@ -226,10 +225,9 @@
             }
 
             @Override
-            public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, short bucketId,
-                                                                      BigInteger startToken, BigInteger endToken)
+            public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, short bucketId, TokenRange range)
             {
-                return selectByJobByRangeFunc.apply(jobId, Range.closed(startToken, endToken));
+                return selectByJobByRangeFunc.apply(jobId, range);
             }
         }
 
