CASSANDRASC-97: Add support for additional digest validation during SSTable upload

In this commit we add the ability to support additional digest algorithms for verification
during SSTable uploads. We introduce the `DigestVerifierFactory` which now supports
XXHash32 and MD5 `DigestVerifier`s.

This commit also adds support for XXHash32 digests. Clients can now send the XXHash32 digest
instead of MD5. This would allow both the clients and server the flexibility to utilize a more
performant algorithm.

Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-97
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b5a9b9..3f1b4cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Add support for additional digest validation during SSTable upload (CASSANDRASC-97)
  * Add sidecar client changes for restore from S3 (CASSANDRASC-95)
  * Add restore SSTables from S3 into Cassandra feature to Cassandra Sidecar (CASSANDRASC-92)
  * Define routing order for http routes (CASSANDRASC-93)
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 9433e3a..7af57e9 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -44,6 +44,7 @@
 import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
 import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
 import org.apache.cassandra.sidecar.client.selection.SingleInstanceSelectionPolicy;
+import org.apache.cassandra.sidecar.common.data.Digest;
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
 import org.jetbrains.annotations.Nullable;
 
@@ -449,14 +450,14 @@
          * @param tableName the table name in Cassandra
          * @param uploadId  an identifier for the upload
          * @param component SSTable component being uploaded
-         * @param checksum  hash value to check integrity of SSTable component uploaded
+         * @param digest    digest value to check integrity of SSTable component uploaded
          * @param filename  the path to the file to be uploaded
          * @return a reference to this Builder
          */
         public Builder uploadSSTableRequest(String keyspace, String tableName, String uploadId, String component,
-                                            String checksum, String filename)
+                                            Digest digest, String filename)
         {
-            return request(new UploadSSTableRequest(keyspace, tableName, uploadId, component, checksum, filename));
+            return request(new UploadSSTableRequest(keyspace, tableName, uploadId, component, digest, filename));
         }
 
         /**
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 3048808..42b7093 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -45,6 +45,7 @@
 import org.apache.cassandra.sidecar.common.data.CreateRestoreJobRequestPayload;
 import org.apache.cassandra.sidecar.common.data.CreateRestoreJobResponsePayload;
 import org.apache.cassandra.sidecar.common.data.CreateSliceRequestPayload;
+import org.apache.cassandra.sidecar.common.data.Digest;
 import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.data.HealthResponse;
 import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
@@ -391,7 +392,7 @@
      * @param table         the table name in Cassandra
      * @param uploadId      the unique identifier for the upload
      * @param componentName the name of the SSTable component
-     * @param checksum      hash value to check integrity of SSTable component uploaded
+     * @param digest        digest value to check integrity of SSTable component uploaded
      * @param filename      the path to the file to be uploaded
      * @return a completable future for the request
      */
@@ -400,7 +401,7 @@
                                                         String table,
                                                         String uploadId,
                                                         String componentName,
-                                                        String checksum,
+                                                        Digest digest,
                                                         String filename)
     {
         return executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
@@ -408,7 +409,7 @@
                                                                                   table,
                                                                                   uploadId,
                                                                                   componentName,
-                                                                                  checksum,
+                                                                                  digest,
                                                                                   filename)
                                                             .build());
     }
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java b/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
index ea0fe98..b93c82a 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
@@ -25,16 +25,16 @@
 import java.util.Map;
 import java.util.Objects;
 
-import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpMethod;
 import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.Digest;
 
 /**
  * Represents a request to upload an SSTable component
  */
 public class UploadSSTableRequest extends Request implements UploadableRequest
 {
-    private final String expectedChecksum;
+    private final Digest digest;
     private final String filename;
 
     /**
@@ -44,14 +44,14 @@
      * @param table     the table name in Cassandra
      * @param uploadId  an identifier for the upload
      * @param component SSTable component being uploaded
-     * @param checksum  hash value to check integrity of SSTable component uploaded
+     * @param digest    digest value to check integrity of SSTable component uploaded
      * @param filename  the path to the file to be uploaded
      */
     public UploadSSTableRequest(String keyspace, String table, String uploadId, String component,
-                                String checksum, String filename)
+                                Digest digest, String filename)
     {
         super(requestURI(keyspace, table, uploadId, component));
-        this.expectedChecksum = checksum;
+        this.digest = digest;
         this.filename = Objects.requireNonNull(filename, "the filename is must be non-null");
 
         if (!Files.exists(Paths.get(filename)))
@@ -72,12 +72,12 @@
     @Override
     public Map<String, String> headers()
     {
-        if (expectedChecksum == null)
+        if (digest == null)
         {
             return super.headers();
         }
         Map<String, String> headers = new HashMap<>(super.headers());
-        headers.put(HttpHeaderNames.CONTENT_MD5.toString(), expectedChecksum);
+        headers.putAll(digest.headers());
         return Collections.unmodifiableMap(headers);
     }
 
diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index fb1102c..e06b709 100644
--- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -63,6 +63,7 @@
 import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.data.HealthResponse;
 import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
 import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import org.apache.cassandra.sidecar.common.data.RingEntry;
 import org.apache.cassandra.sidecar.common.data.RingResponse;
@@ -70,6 +71,7 @@
 import org.apache.cassandra.sidecar.common.data.SchemaResponse;
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
 import org.apache.cassandra.sidecar.foundation.RestoreJobSecretsGen;
 
@@ -78,6 +80,8 @@
 import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static io.netty.handler.codec.http.HttpResponseStatus.PARTIAL_CONTENT;
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatException;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -689,7 +693,7 @@
     }
 
     @Test
-    void testUploadSSTableWithoutChecksum(@TempDir Path tempDirectory) throws Exception
+    void testUploadSSTableWithoutDigest(@TempDir Path tempDirectory) throws Exception
     {
         Path fileToUpload = prepareFile(tempDirectory);
         try (MockWebServer server = new MockWebServer())
@@ -722,7 +726,7 @@
     }
 
     @Test
-    void testUploadSSTableWithChecksum(@TempDir Path tempDirectory) throws Exception
+    void testUploadSSTableWithMD5Digest(@TempDir Path tempDirectory) throws Exception
     {
         Path fileToUpload = prepareFile(tempDirectory);
         try (MockWebServer server = new MockWebServer())
@@ -735,7 +739,7 @@
                                         "cyclist_name",
                                         "0000-0000",
                                         "nb-1-big-TOC.txt",
-                                        "15a69dc6501aa5ae17af037fe053f610",
+                                        new MD5Digest("15a69dc6501aa5ae17af037fe053f610"),
                                         fileToUpload.toString())
                   .get(30, TimeUnit.SECONDS);
 
@@ -756,6 +760,76 @@
     }
 
     @Test
+    void testUploadSSTableWithXXHashDigest(@TempDir Path tempDirectory) throws Exception
+    {
+        Path fileToUpload = prepareFile(tempDirectory);
+        try (MockWebServer server = new MockWebServer())
+        {
+            server.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+            SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server);
+            client.uploadSSTableRequest(sidecarInstance,
+                                        "cycling",
+                                        "cyclist_name",
+                                        "0000-0000",
+                                        "nb-1-big-TOC.txt",
+                                        new XXHash32Digest("15a69dc6501aa5ae17af037fe053f610"),
+                                        fileToUpload.toString())
+                  .get(30, TimeUnit.SECONDS);
+
+            assertThat(server.getRequestCount()).isEqualTo(1);
+            RecordedRequest request = server.takeRequest();
+            assertThat(request.getPath())
+            .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE
+                       .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000")
+                       .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "cycling")
+                       .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name")
+                       .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt"));
+            assertThat(request.getMethod()).isEqualTo("PUT");
+            assertThat(request.getHeader(CONTENT_XXHASH32))
+            .isEqualTo("15a69dc6501aa5ae17af037fe053f610");
+            assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isNull();
+            assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80");
+            assertThat(request.getBodySize()).isEqualTo(80);
+        }
+    }
+
+    @Test
+    void testUploadSSTableWithXXHashDigestAndSeed(@TempDir Path tempDirectory) throws Exception
+    {
+        Path fileToUpload = prepareFile(tempDirectory);
+        try (MockWebServer server = new MockWebServer())
+        {
+            server.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+            SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server);
+            client.uploadSSTableRequest(sidecarInstance,
+                                        "cycling",
+                                        "cyclist_name",
+                                        "0000-0000",
+                                        "nb-1-big-TOC.txt",
+                                        new XXHash32Digest("15a69dc6501aa5ae17af037fe053f610", "123456"),
+                                        fileToUpload.toString())
+                  .get(30, TimeUnit.SECONDS);
+
+            assertThat(server.getRequestCount()).isEqualTo(1);
+            RecordedRequest request = server.takeRequest();
+            assertThat(request.getPath())
+            .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE
+                       .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000")
+                       .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "cycling")
+                       .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name")
+                       .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt"));
+            assertThat(request.getMethod()).isEqualTo("PUT");
+            assertThat(request.getHeader(CONTENT_XXHASH32))
+            .isEqualTo("15a69dc6501aa5ae17af037fe053f610");
+            assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isEqualTo("123456");
+            assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80");
+            assertThat(request.getBodySize()).isEqualTo(80);
+        }
+    }
+
+    @Test
     void testStreamSSTableComponentWithNoRange() throws Exception
     {
         try (MockWebServer server = new MockWebServer())
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
new file mode 100644
index 0000000..1d4f432
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.Map;
+
+/**
+ * Interface that represents a checksum digest
+ */
+public interface Digest
+{
+    /**
+     * @return headers to be used in the HTTP request
+     */
+    Map<String, String> headers();
+
+    /**
+     * @return the string representation of the digest
+     */
+    String value();
+
+    /**
+     * @return the name of the digest's algorithm
+     */
+    String algorithm();
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java
new file mode 100644
index 0000000..ba8b7b5
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implements the MD5 checksum digest
+ */
+public class MD5Digest implements Digest
+{
+    private final @NotNull String value;
+
+    /**
+     * Constructs a new MD5Digest with the provided MD5 {@code value}
+     *
+     * @param value the MD5 value
+     */
+    public MD5Digest(@NotNull String value)
+    {
+        this.value = Objects.requireNonNull(value, "value is required");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String value()
+    {
+        return value;
+    }
+
+    @Override
+    public String algorithm()
+    {
+        return "MD5";
+    }
+
+    /**
+     * @return MD5 headers for the Digest
+     */
+    @Override
+    public Map<String, String> headers()
+    {
+        return Collections.singletonMap(HttpHeaderNames.CONTENT_MD5.toString(), value);
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
new file mode 100644
index 0000000..48127e4
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
+
+/**
+ * Implements the XXHash32 Digest
+ */
+public class XXHash32Digest implements Digest
+{
+    private final @NotNull String value;
+    private final @Nullable String seedHex;
+
+    /**
+     * Constructs a new XXHashDigest with the provided XXHash {@code value}
+     *
+     * @param value the xxhash value
+     */
+    public XXHash32Digest(String value)
+    {
+        this(value, null);
+    }
+
+    /**
+     * Constructs a new instance with the provided XXHash {@code value} and the {@code seed} value.
+     *
+     * @param value the xxhash value
+     * @param seed  the seed
+     */
+    public XXHash32Digest(String value, int seed)
+    {
+        this(value, Integer.toHexString(seed));
+    }
+
+    /**
+     * Constructs a new XXHashDigest with the provided XXHash {@code value} and the seed value represented as
+     * a hexadecimal string
+     *
+     * @param value   the xxhash value
+     * @param seedHex the value of the seed represented as a hexadecimal value
+     */
+    public XXHash32Digest(@NotNull String value, @Nullable String seedHex)
+    {
+        this.value = Objects.requireNonNull(value, "value is required");
+        this.seedHex = seedHex;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String value()
+    {
+        return value;
+    }
+
+    /**
+     * @return the optional seed in hexadecimal format
+     */
+    public @Nullable String seedHex()
+    {
+        return seedHex;
+    }
+
+    @Override
+    public String algorithm()
+    {
+        return "XXHash32";
+    }
+
+    /**
+     * @return XXHash headers for the digest
+     */
+    @Override
+    public Map<String, String> headers()
+    {
+        Map<String, String> headers = new HashMap<>();
+        headers.put(CONTENT_XXHASH32, value);
+        if (seedHex != null)
+        {
+            headers.put(CONTENT_XXHASH32_SEED, seedHex);
+        }
+        return headers;
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java b/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
new file mode 100644
index 0000000..347ed59
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.http;
+
+/**
+ * Custom header names for sidecar
+ */
+public final class SidecarHttpHeaderNames
+{
+    /**
+     * {@code "cassandra-content-xxhash32"}
+     */
+    public static final String CONTENT_XXHASH32 = "cassandra-content-xxhash32";
+    /**
+     * {@code "cassandra-content-xxhash32-seed"}
+     */
+    public static final String CONTENT_XXHASH32_SEED = "cassandra-content-xxhash32-seed";
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java b/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
index cef1491..60cfbd2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
+++ b/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.sidecar.data;
 
-import io.netty.handler.codec.http.HttpHeaderNames;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.common.data.SSTableUploads;
@@ -29,7 +28,6 @@
 public class SSTableUploadRequest extends SSTableUploads
 {
     private final String component;
-    private final String expectedChecksum;
 
     /**
      * Constructs an SSTableUploadRequest
@@ -37,16 +35,13 @@
      * @param qualifiedTableName the qualified table name in Cassandra
      * @param uploadId           an identifier for the upload
      * @param component          SSTable component being uploaded
-     * @param expectedChecksum   expected hash value to check integrity of SSTable component uploaded
      */
     public SSTableUploadRequest(QualifiedTableName qualifiedTableName,
                                 String uploadId,
-                                String component,
-                                String expectedChecksum)
+                                String component)
     {
         super(qualifiedTableName, uploadId);
         this.component = component;
-        this.expectedChecksum = expectedChecksum;
     }
 
     /**
@@ -58,14 +53,6 @@
     }
 
     /**
-     * @return expected checksum value of SSTable component
-     */
-    public String expectedChecksum()
-    {
-        return this.expectedChecksum;
-    }
-
-    /**
      * {@inheritDoc}
      */
     public String toString()
@@ -75,7 +62,6 @@
                ", keyspace='" + keyspace() + '\'' +
                ", tableName='" + table() + '\'' +
                ", component='" + component + '\'' +
-               ", expectedChecksum='" + expectedChecksum + '\'' +
                '}';
     }
 
@@ -90,7 +76,6 @@
     {
         return new SSTableUploadRequest(qualifiedTableName,
                                         context.pathParam("uploadId"),
-                                        context.pathParam("component"),
-                                        context.request().getHeader(HttpHeaderNames.CONTENT_MD5.toString()));
+                                        context.pathParam("component"));
     }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 918d8c0..8828c8f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -115,11 +115,21 @@
      */
     public static String checksum(File file) throws IOException
     {
+        int seed = 0x9747b28c; // random seed for initializing
+        return checksum(file, seed);
+    }
+
+    /**
+     * @param file the file to use to perform the checksum
+     * @param seed the seed to use for the hasher
+     * @return the checksum hex string of the file's content. XXHash32 is employed as the hash algorithm.
+     */
+    public static String checksum(File file, int seed) throws IOException
+    {
         try (FileInputStream fis = new FileInputStream(file))
         {
             // might have shared hashers with ThreadLocal
             XXHashFactory factory = XXHashFactory.safeInstance();
-            int seed = 0x9747b28c; // random seed for initializing
             try (StreamingXXHash32 hasher = factory.newStreamingHash32(seed))
             {
                 byte[] buffer = new byte[KB_512];
@@ -128,7 +138,7 @@
                 {
                     hasher.update(buffer, 0, len);
                 }
-                return Long.toHexString(hasher.getValue()); 
+                return Long.toHexString(hasher.getValue());
             }
         }
     }
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
index dc23141..9347f7f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
@@ -42,6 +42,8 @@
 import org.apache.cassandra.sidecar.stats.SSTableStats;
 import org.apache.cassandra.sidecar.stats.SidecarStats;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.DigestVerifier;
+import org.apache.cassandra.sidecar.utils.DigestVerifierFactory;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.MetadataUtils;
 import org.apache.cassandra.sidecar.utils.SSTableUploader;
@@ -62,6 +64,7 @@
     private final SSTableUploadsPathBuilder uploadPathBuilder;
     private final ConcurrencyLimiter limiter;
     private final SSTableStats stats;
+    private final DigestVerifierFactory digestVerifierFactory;
 
     /**
      * Constructs a handler with the provided params.
@@ -74,6 +77,7 @@
      * @param executorPools        executor pools for blocking executions
      * @param validator            a validator instance to validate Cassandra-specific input
      * @param sidecarStats         an interface holding all stats related to main sidecar process
+     * @param digestVerifierFactory a factory of checksum verifiers
      */
     @Inject
     protected SSTableUploadHandler(Vertx vertx,
@@ -83,7 +87,8 @@
                                    SSTableUploadsPathBuilder uploadPathBuilder,
                                    ExecutorPools executorPools,
                                    CassandraInputValidator validator,
-                                   SidecarStats sidecarStats)
+                                   SidecarStats sidecarStats,
+                                   DigestVerifierFactory digestVerifierFactory)
     {
         super(metadataFetcher, executorPools, validator);
         this.fs = vertx.fileSystem();
@@ -92,6 +97,7 @@
         this.uploadPathBuilder = uploadPathBuilder;
         this.limiter = new ConcurrencyLimiter(configuration::concurrentUploadsLimit);
         this.stats = sidecarStats.ssTableStats();
+        this.digestVerifierFactory = digestVerifierFactory;
     }
 
     /**
@@ -124,11 +130,14 @@
         .compose(validRequest -> uploadPathBuilder.resolveStagingDirectory(host))
         .compose(this::ensureSufficientSpaceAvailable)
         .compose(v -> uploadPathBuilder.build(host, request))
-        .compose(uploadDirectory -> uploader.uploadComponent(httpRequest,
-                                                             uploadDirectory,
-                                                             request.component(),
-                                                             request.expectedChecksum(),
-                                                             configuration.filePermissions()))
+        .compose(uploadDirectory -> {
+            DigestVerifier digestVerifier = digestVerifierFactory.verifier(httpRequest.headers());
+            return uploader.uploadComponent(httpRequest,
+                                            uploadDirectory,
+                                            request.component(),
+                                            digestVerifier,
+                                            configuration.filePermissions());
+        })
         .compose(fs::props)
         .onSuccess(fileProps -> {
             long serviceTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index b1af4b7..31ab7b1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -96,8 +96,6 @@
 import org.apache.cassandra.sidecar.stats.SidecarSchemaStats;
 import org.apache.cassandra.sidecar.stats.SidecarStats;
 import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
-import org.apache.cassandra.sidecar.utils.ChecksumVerifier;
-import org.apache.cassandra.sidecar.utils.MD5ChecksumVerifier;
 import org.apache.cassandra.sidecar.utils.TimeProvider;
 
 import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
@@ -430,13 +428,6 @@
 
     @Provides
     @Singleton
-    public ChecksumVerifier checksumVerifier(Vertx vertx)
-    {
-        return new MD5ChecksumVerifier(vertx.fileSystem());
-    }
-
-    @Provides
-    @Singleton
     public SidecarVersionProvider sidecarVersionProvider()
     {
         return new SidecarVersionProvider("/sidecar.version");
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
new file mode 100644
index 0000000..790b54c
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.Digest;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH;
+
+/**
+ * Provides basic functionality to perform digest validations using {@link AsyncFile}
+ *
+ * @param <D> the Digest type
+ */
+public abstract class AsyncFileDigestVerifier<D extends Digest> implements DigestVerifier
+{
+    public static final int DEFAULT_READ_BUFFER_SIZE = 512 * 1024; // 512KiB
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+    protected final FileSystem fs;
+    protected final D digest;
+
+    protected AsyncFileDigestVerifier(FileSystem fs, D digest)
+    {
+        this.fs = fs;
+        this.digest = Objects.requireNonNull(digest, "digest is required");
+    }
+
+    /**
+     * @param filePath path to SSTable component
+     * @return a future String with the component path if verification is a success, otherwise a failed future
+     */
+    @Override
+    public Future<String> verify(String filePath)
+    {
+        logger.debug("Validating {}. expected_digest={}", digest.algorithm(), digest.value());
+
+        return fs.open(filePath, new OpenOptions())
+                 .compose(this::calculateDigest)
+                 .compose(computedDigest -> {
+                     if (!computedDigest.equals(digest.value()))
+                     {
+                         logger.error("Digest mismatch. computed_digest={}, expected_digest={}, algorithm=MD5",
+                                      computedDigest, digest.value());
+                         return Future.failedFuture(new HttpException(CHECKSUM_MISMATCH.code(),
+                                                                      String.format("Digest mismatch. "
+                                                                                    + "expected_digest=%s, "
+                                                                                    + "algorithm=%s",
+                                                                                    digest.value(),
+                                                                                    digest.algorithm())));
+                     }
+                     return Future.succeededFuture(filePath);
+                 });
+    }
+
+    /**
+     * Returns a future with the calculated digest for the provided {@link AsyncFile file}.
+     *
+     * @param asyncFile the async file to use for digest calculation
+     * @return a future with the computed digest for the provided {@link AsyncFile file}
+     */
+    protected abstract Future<String> calculateDigest(AsyncFile asyncFile);
+
+    protected void readFile(AsyncFile file, Promise<String> result, Handler<Buffer> onBufferAvailable,
+                            Handler<Void> onReadComplete)
+    {
+        // Make sure to close the file when complete
+        result.future().onComplete(ignored -> file.end());
+        file.pause()
+            .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE)
+            .handler(onBufferAvailable)
+            .endHandler(onReadComplete)
+            .exceptionHandler(cause -> {
+                logger.error("Error while calculating the {} digest", digest.algorithm(), cause);
+                result.fail(cause);
+            })
+            .resume();
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
similarity index 64%
rename from src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
rename to src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
index e338642..07bb741 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
@@ -23,16 +23,15 @@
 /**
  * Interface to verify integrity of SSTables uploaded.
  * <p>
- * Note: If checksum calculations of multiple files are happening at the same time, we would want to limit concurrent
- * checksum calculations. Since {@link ChecksumVerifier} is currently used only by upload handler, we are not
- * introducing another limit here. Concurrent uploads limit should limit concurrent checksum calculations as well.
+ * Note: If digest calculations of multiple files are happening at the same time, we would want to limit concurrent
+ * digest calculations. Since {@link DigestVerifier} is currently used only by upload handler, we are not
+ * introducing another limit here. Concurrent uploads limit should limit concurrent digest calculations as well.
  */
-public interface ChecksumVerifier
+public interface DigestVerifier
 {
     /**
-     * @param checksum  expected checksum value
-     * @param filePath  path to SSTable component
-     * @return String   component path, if verification is a success, else a failed future is returned
+     * @param filePath path to SSTable component
+     * @return a future String with the component path if verification is a success, otherwise a failed future
      */
-    Future<String> verify(String checksum, String filePath);
+    Future<String> verify(String filePath);
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java
new file mode 100644
index 0000000..3dd64a2
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.FileSystem;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+
+/**
+ * A factory class that returns the {@link DigestVerifier} instance.
+ */
+@Singleton
+public class DigestVerifierFactory
+{
+    @VisibleForTesting
+    static final DigestVerifier FALLBACK_VERIFIER = Future::succeededFuture;
+    private final FileSystem fs;
+
+
+    /**
+     * Constructs a new factory
+     *
+     * @param vertx the vertx instance
+     */
+    @Inject
+    public DigestVerifierFactory(Vertx vertx)
+    {
+        this.fs = vertx.fileSystem();
+    }
+
+    /***
+     * Returns the first match for a {@link DigestVerifier} from the registered list of verifiers. If none of the
+     * verifiers matches, a no-op validator is returned.
+     *
+     * @param headers the request headers used to test whether a {@link DigestVerifier} can be used to verify the
+     *                request
+     * @return the first match for a {@link DigestVerifier} from the registered list of verifiers, or a no-op
+     * verifier if none match
+     */
+    public DigestVerifier verifier(MultiMap headers)
+    {
+        if (headers.contains(CONTENT_XXHASH32))
+        {
+            return XXHash32DigestVerifier.create(fs, headers);
+        }
+        else if (headers.contains(HttpHeaderNames.CONTENT_MD5.toString()))
+        {
+            return MD5DigestVerifier.create(fs, headers);
+        }
+        // Fallback to no-op validator
+        return FALLBACK_VERIFIER;
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
deleted file mode 100644
index 132ff10..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.utils;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Base64;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.vertx.core.Future;
-import io.vertx.core.Promise;
-import io.vertx.core.file.AsyncFile;
-import io.vertx.core.file.FileSystem;
-import io.vertx.core.file.OpenOptions;
-import io.vertx.ext.web.handler.HttpException;
-import org.jetbrains.annotations.VisibleForTesting;
-
-import static org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH;
-
-/**
- * Implementation of {@link ChecksumVerifier}. Here we use MD5 implementation of {@link java.security.MessageDigest}
- * for calculating checksum. And match the calculated checksum with expected checksum obtained from request.
- */
-public class MD5ChecksumVerifier implements ChecksumVerifier
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(MD5ChecksumVerifier.class);
-    public static final int DEFAULT_READ_BUFFER_SIZE = 64 * 1024; // 64KiB
-    private final FileSystem fs;
-
-    public MD5ChecksumVerifier(FileSystem fs)
-    {
-        this.fs = fs;
-    }
-
-    public Future<String> verify(String expectedChecksum, String filePath)
-    {
-        if (expectedChecksum == null)
-        {
-            return Future.succeededFuture(filePath);
-        }
-
-        LOGGER.debug("Validating MD5. expected_checksum={}", expectedChecksum);
-
-        return fs.open(filePath, new OpenOptions())
-                 .compose(this::calculateMD5)
-                 .compose(computedChecksum -> {
-                     if (!expectedChecksum.equals(computedChecksum))
-                     {
-                         LOGGER.error("Checksum mismatch. computed_checksum={}, expected_checksum={}, algorithm=MD5",
-                                      computedChecksum, expectedChecksum);
-                         return Future.failedFuture(new HttpException(CHECKSUM_MISMATCH.code(),
-                                                                      String.format("Checksum mismatch. "
-                                                                                    + "expected_checksum=%s, "
-                                                                                    + "algorithm=MD5",
-                                                                                    expectedChecksum)));
-                     }
-                     return Future.succeededFuture(filePath);
-                 });
-    }
-
-    @VisibleForTesting
-    Future<String> calculateMD5(AsyncFile file)
-    {
-        MessageDigest digest;
-        try
-        {
-            digest = MessageDigest.getInstance("MD5");
-        }
-        catch (NoSuchAlgorithmException e)
-        {
-            return Future.failedFuture(e);
-        }
-
-        Promise<String> result = Promise.promise();
-        file.pause()
-            .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE)
-            .handler(buf -> digest.update(buf.getBytes()))
-            .endHandler(_v -> {
-                result.complete(Base64.getEncoder().encodeToString(digest.digest()));
-                file.end();
-            })
-            .exceptionHandler(cause -> {
-                LOGGER.error("Error while calculating MD5 checksum", cause);
-                result.fail(cause);
-                file.end();
-            })
-            .resume();
-        return result.future();
-    }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java
new file mode 100644
index 0000000..ed4218d
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Implementation of {@link DigestVerifier}. Here we use MD5 implementation of {@link java.security.MessageDigest}
+ * for calculating digest and match the calculated digest with expected digest obtained from request.
+ */
+public class MD5DigestVerifier extends AsyncFileDigestVerifier<MD5Digest>
+{
+    protected MD5DigestVerifier(FileSystem fs, MD5Digest digest)
+    {
+        super(fs, digest);
+    }
+
+    public static DigestVerifier create(FileSystem fs, MultiMap headers)
+    {
+        MD5Digest md5Digest = new MD5Digest(headers.get(HttpHeaderNames.CONTENT_MD5.toString()));
+        return new MD5DigestVerifier(fs, md5Digest);
+    }
+
+    @Override
+    @VisibleForTesting
+    protected Future<String> calculateDigest(AsyncFile file)
+    {
+        MessageDigest digest;
+        try
+        {
+            digest = MessageDigest.getInstance("MD5");
+        }
+        catch (NoSuchAlgorithmException e)
+        {
+            return Future.failedFuture(e);
+        }
+
+        Promise<String> result = Promise.promise();
+
+        readFile(file, result, buf -> digest.update(buf.getBytes()),
+                 _v -> result.complete(Base64.getEncoder().encodeToString(digest.digest())));
+
+        return result.future();
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
index 465f0a4..e7f59a4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
@@ -39,6 +39,7 @@
 import io.vertx.core.file.OpenOptions;
 import io.vertx.core.streams.ReadStream;
 import io.vertx.core.streams.WriteStream;
+import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
 
 /**
  * A class that handles SSTable Uploads
@@ -50,23 +51,19 @@
     private static final String DEFAULT_TEMP_SUFFIX = ".tmp";
 
     private final FileSystem fs;
-    private final ChecksumVerifier checksumVerifier;
     private final SidecarRateLimiter rateLimiter;
 
     /**
      * Constructs an instance of {@link SSTableUploader} with provided params for uploading an SSTable component.
      *
-     * @param vertx            Vertx reference
-     * @param checksumVerifier verifier for checking integrity of upload
-     * @param rateLimiter      rate limiter for uploading SSTable components
+     * @param vertx       Vertx reference
+     * @param rateLimiter rate limiter for uploading SSTable components
      */
     @Inject
     public SSTableUploader(Vertx vertx,
-                           ChecksumVerifier checksumVerifier,
                            @Named("IngressFileRateLimiter") SidecarRateLimiter rateLimiter)
     {
         this.fs = vertx.fileSystem();
-        this.checksumVerifier = checksumVerifier;
         this.rateLimiter = rateLimiter;
     }
 
@@ -76,14 +73,14 @@
      * @param readStream        server request from which file upload is acquired
      * @param uploadDirectory   the absolute path to the upload directory in the target {@code fs}
      * @param componentFileName the file name of the component
-     * @param expectedChecksum  for verifying upload integrity, passed in through request
+     * @param digestVerifier    the digest verifier instance
      * @param filePermissions   specifies the posix file permissions used to create the SSTable file
      * @return path of SSTable component to which data was uploaded
      */
     public Future<String> uploadComponent(ReadStream<Buffer> readStream,
                                           String uploadDirectory,
                                           String componentFileName,
-                                          String expectedChecksum,
+                                          DigestVerifier digestVerifier,
                                           String filePermissions)
     {
 
@@ -92,15 +89,16 @@
 
         return fs.mkdirs(uploadDirectory) // ensure the parent directory is created
                  .compose(v -> createTempFile(uploadDirectory, componentFileName, filePermissions))
-                 .compose(tempFilePath -> streamAndVerify(readStream, tempFilePath, expectedChecksum))
+                 .compose(tempFilePath -> streamAndVerify(readStream, tempFilePath, digestVerifier))
                  .compose(verifiedTempFilePath -> moveAtomicallyWithFallBack(verifiedTempFilePath, targetPath));
     }
 
-    private Future<String> streamAndVerify(ReadStream<Buffer> readStream, String tempFilePath, String expectedChecksum)
+    private Future<String> streamAndVerify(ReadStream<Buffer> readStream, String tempFilePath,
+                                           DigestVerifier digestVerifier)
     {
         // pipe read stream to temp file
         return streamToFile(readStream, tempFilePath)
-               .compose(v -> checksumVerifier.verify(expectedChecksum, tempFilePath))
+               .compose(v -> digestVerifier.verify(tempFilePath))
                .onFailure(throwable -> fs.delete(tempFilePath));
     }
 
@@ -128,7 +126,9 @@
         LOGGER.debug("Moving from={} to={}", source, target);
         return fs.move(source, target, new CopyOptions().setAtomicMove(true))
                  .recover(cause -> {
-                     if (hasCause(cause, AtomicMoveNotSupportedException.class, 10))
+                     Exception atomicMoveNotSupportedException =
+                     ThrowableUtils.getCause(cause, AtomicMoveNotSupportedException.class);
+                     if (atomicMoveNotSupportedException != null)
                      {
                          LOGGER.warn("Failed to perform atomic move from={} to={}", source, target, cause);
                          return fs.move(source, target, new CopyOptions().setAtomicMove(false));
@@ -139,33 +139,6 @@
     }
 
     /**
-     * Returns true if a cause of type {@code type} is found in the stack trace before exceeding the {@code depth}
-     *
-     * @param cause the original cause
-     * @param type  the exception type to test
-     * @param depth the maximum depth to check in the stack trace
-     * @return true if the exception of type {@code type} exists in the stacktrace, false otherwise
-     */
-    private static boolean hasCause(Throwable cause, Class<? extends Throwable> type, int depth)
-    {
-        int i = 0;
-        while (i < depth)
-        {
-            if (cause == null)
-                return false;
-
-            if (type.isInstance(cause))
-                return true;
-
-            cause = cause.getCause();
-
-            i++;
-        }
-        return false;
-    }
-
-
-    /**
      * A {@link WriteStream} implementation that supports rate limiting.
      */
     public static class RateLimitedWriteStream implements WriteStream<Buffer>
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java
new file mode 100644
index 0000000..b1944c6
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import net.jpountz.xxhash.StreamingXXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
+
+/**
+ * Implementation of {@link DigestVerifier} to calculate the digest and match the calculated digest
+ * with the expected digest.
+ */
+public class XXHash32DigestVerifier extends AsyncFileDigestVerifier<XXHash32Digest>
+{
+    protected XXHash32DigestVerifier(FileSystem fs, XXHash32Digest digest)
+    {
+        super(fs, digest);
+    }
+
+    public static XXHash32DigestVerifier create(FileSystem fs, MultiMap headers)
+    {
+        XXHash32Digest digest = new XXHash32Digest(headers.get(CONTENT_XXHASH32), headers.get(CONTENT_XXHASH32_SEED));
+        return new XXHash32DigestVerifier(fs, digest);
+    }
+
+    @Override
+    @VisibleForTesting
+    protected Future<String> calculateDigest(AsyncFile file)
+    {
+        Promise<String> result = Promise.promise();
+        Future<String> future = result.future();
+
+        // might have shared hashers with ThreadLocal
+        XXHashFactory factory = XXHashFactory.safeInstance();
+
+        int seed = maybeGetSeedOrDefault();
+        StreamingXXHash32 hasher = factory.newStreamingHash32(seed);
+
+        future.onComplete(ignored -> hasher.close());
+
+        readFile(file, result, buf -> {
+                     byte[] bytes = buf.getBytes();
+                     hasher.update(bytes, 0, bytes.length);
+                 },
+                 _v -> result.complete(Long.toHexString(hasher.getValue())));
+
+        return future;
+    }
+
+    protected int maybeGetSeedOrDefault()
+    {
+        String seedHex = digest.seedHex();
+        if (seedHex != null)
+        {
+            return (int) Long.parseLong(seedHex, 16);
+        }
+        return 0x9747b28c; // random seed for initializing
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
index 49554c6..2694fb4 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.sidecar.routes.sstableuploads;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -27,7 +26,6 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
@@ -45,6 +43,9 @@
 import io.vertx.ext.web.client.WebClient;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.Digest;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
 import org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus;
 import org.apache.cassandra.sidecar.snapshots.SnapshotUtils;
 import org.assertj.core.data.Percentage;
@@ -58,6 +59,7 @@
 import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE;
 import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
 import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static org.apache.cassandra.sidecar.utils.TestFileUtils.prepareTestFile;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.when;
 
@@ -75,7 +77,7 @@
     void testUploadWithoutMd5_expectSuccessfulUpload(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false);
     }
 
@@ -83,15 +85,63 @@
     void testUploadWithCorrectMd5_expectSuccessfulUpload(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-md5.db", "jXd/OF09/siBXSD3SWAm3A==",
-                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false);
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-md5.db",
+                                   new MD5Digest("jXd/OF09/siBXSD3SWAm3A=="),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   HttpResponseStatus.OK.code(),
+                                   false);
+    }
+
+    @Test
+    void testUploadWithCorrectXXHash_expectSuccessfulUpload(VertxTestContext context) throws IOException
+    {
+        UUID uploadId = UUID.randomUUID();
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-xxhash.db",
+                                   new XXHash32Digest("7a28edc0"),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   HttpResponseStatus.OK.code(),
+                                   false);
+    }
+
+    @Test
+    void testUploadWithCorrectXXHashAndCustomSeed_expectSuccessfulUpload(VertxTestContext context) throws IOException
+    {
+        UUID uploadId = UUID.randomUUID();
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-xxhash.db",
+                                   new XXHash32Digest("ffffffffb9510d6b", "55555555"),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   HttpResponseStatus.OK.code(),
+                                   false);
     }
 
     @Test
     void testUploadWithIncorrectMd5_expectErrorCode(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-md5.db", "incorrectMd5",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-md5.db",
+                                   new MD5Digest("incorrectMd5"),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
+                                   false);
+    }
+
+    @Test
+    void testUploadWithIncorrectXXHash_expectErrorCode(VertxTestContext context) throws IOException
+    {
+        UUID uploadId = UUID.randomUUID();
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-xxhash.db",
+                                   new XXHash32Digest("incorrectXXHash"),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
+                                   false);
+    }
+
+    @Test
+    void testUploadWithIncorrectXXHashAndCustomSeed_expectErrorCode(VertxTestContext context) throws IOException
+    {
+        UUID uploadId = UUID.randomUUID();
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-xxhash.db",
+                                   new XXHash32Digest("7a28edc0", "bad"),
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
                                    false);
@@ -101,26 +151,27 @@
     void testInvalidFileName_expectErrorCode(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "ks$tbl-me-4-big-Data.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "ks$tbl-me-4-big-Data.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
     }
 
     @Test
-    void testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context) throws IOException
+    void testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context)
     {
         UUID uploadId = UUID.randomUUID();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-content-length.db",
-                                   "jXd/OF09/siBXSD3SWAm3A==", 0, HttpResponseStatus.OK.code(), false);
+                                   new MD5Digest("jXd/OF09/siBXSD3SWAm3A=="), 0, HttpResponseStatus.OK.code(), false);
     }
 
     @Test
-    void testUploadTimeout_expectTimeoutError(VertxTestContext context) throws IOException
+    void testUploadTimeout_expectTimeoutError(VertxTestContext context)
     {
         // if we send more than actual length, vertx goes hung, probably looking for more data than exists in the file,
         // we should see timeout error in this case
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-higher-content-length.db", "", 1000, -1, true);
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-higher-content-length.db", null, 1000, -1,
+                                   true);
     }
 
     @Test
@@ -128,14 +179,14 @@
     {
         UUID uploadId = UUID.randomUUID();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-lesser-content-length.db",
-                                   "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
+                                   null, Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
                                    false);
     }
 
     @Test
     void testInvalidUploadId(VertxTestContext context) throws IOException
     {
-        sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl", "with-lesser-content-length.db", "",
+        sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl", "with-lesser-content-length.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
                                    false, response -> {
             JsonObject error = response.bodyAsJsonObject();
@@ -149,7 +200,7 @@
     void testInvalidKeyspace(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace", "tbl", "with-lesser-content-length.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace", "tbl", "with-lesser-content-length.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
     }
@@ -158,7 +209,7 @@
     void testInvalidTable(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "invalidTableName", "with-lesser-content-length.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "invalidTableName", "with-lesser-content-length.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
     }
@@ -169,7 +220,7 @@
         when(mockSSTableUploadConfiguration.minimumSpacePercentageRequired()).thenReturn(100F);
 
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    HttpResponseStatus.INSUFFICIENT_STORAGE.code(), false);
     }
@@ -180,7 +231,7 @@
         when(mockSSTableUploadConfiguration.concurrentUploadsLimit()).thenReturn(0);
 
         UUID uploadId = UUID.randomUUID();
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    HttpResponseStatus.TOO_MANY_REQUESTS.code(), false);
     }
@@ -193,13 +244,13 @@
         UUID uploadId = UUID.randomUUID();
         CountDownLatch latch = new CountDownLatch(1);
         sendUploadRequestAndVerify(latch, context, uploadId.toString(), "invalidKeyspace", "tbl",
-                                   "without-md5.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   "without-md5.db", null, Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    HttpResponseStatus.BAD_REQUEST.code(), false);
 
         assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
 
         // checking if permits were released after bad requests
-        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false);
     }
 
@@ -209,7 +260,7 @@
         String uploadId = UUID.randomUUID().toString();
         when(mockSSTableUploadConfiguration.filePermissions()).thenReturn("rwxr-xr-x");
 
-        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "without-md5.db", "",
+        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "without-md5.db", null,
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(),
                                    false, response -> {
 
@@ -247,7 +298,7 @@
 
         long startTime = System.nanoTime();
         String uploadId = UUID.randomUUID().toString();
-        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "1MB-File-Data.db", "",
+        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "1MB-File-Data.db", null,
                                    Files.size(largeFilePath), HttpResponseStatus.OK.code(),
                                    false, response -> {
 
@@ -269,7 +320,7 @@
 
         long startTime = System.nanoTime();
         String uploadId = UUID.randomUUID().toString();
-        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "1MB-File-Data.db", "",
+        sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "1MB-File-Data.db", null,
                                    Files.size(largeFilePath), HttpResponseStatus.OK.code(),
                                    false, response -> {
 
@@ -286,13 +337,13 @@
                                             String keyspace,
                                             String tableName,
                                             String targetFileName,
-                                            String expectedMd5,
+                                            Digest expectedDigest,
                                             long fileLength,
                                             int expectedRetCode,
                                             boolean expectTimeout)
     {
         sendUploadRequestAndVerify(null, context, uploadId.toString(), keyspace, tableName, targetFileName,
-                                   expectedMd5, fileLength, expectedRetCode, expectTimeout);
+                                   expectedDigest, fileLength, expectedRetCode, expectTimeout);
     }
 
     private void sendUploadRequestAndVerify(CountDownLatch latch,
@@ -301,7 +352,7 @@
                                             String keyspace,
                                             String tableName,
                                             String targetFileName,
-                                            String expectedMd5,
+                                            Digest expectedDigest,
                                             long fileLength,
                                             int expectedRetCode,
                                             boolean expectTimeout)
@@ -312,7 +363,7 @@
                                    keyspace,
                                    tableName,
                                    targetFileName,
-                                   expectedMd5,
+                                   expectedDigest,
                                    fileLength,
                                    expectedRetCode,
                                    expectTimeout,
@@ -326,7 +377,7 @@
                                             String keyspace,
                                             String tableName,
                                             String targetFileName,
-                                            String expectedMd5,
+                                            Digest expectedDigest,
                                             long fileLength,
                                             int expectedRetCode,
                                             boolean expectTimeout,
@@ -337,9 +388,9 @@
         String testRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" + keyspace
                            + "/tables/" + tableName + "/components/" + targetFileName;
         HttpRequest<Buffer> req = client.put(server.actualPort(), "localhost", testRoute);
-        if (!expectedMd5.isEmpty())
+        if (expectedDigest != null)
         {
-            req.putHeader(HttpHeaderNames.CONTENT_MD5.toString(), expectedMd5);
+            req.headers().addAll(expectedDigest.headers());
         }
         if (fileLength != 0)
         {
@@ -383,24 +434,4 @@
             client.close();
         });
     }
-
-    static Path prepareTestFile(Path directory, String fileName, long sizeInBytes) throws IOException
-    {
-        Path filePath = directory.resolve(fileName);
-        Files.deleteIfExists(filePath);
-
-        byte[] buffer = new byte[1024];
-        try (OutputStream outputStream = Files.newOutputStream(filePath))
-        {
-            int written = 0;
-            while (written < sizeInBytes)
-            {
-                ThreadLocalRandom.current().nextBytes(buffer);
-                int toWrite = (int) Math.min(buffer.length, sizeInBytes - written);
-                outputStream.write(buffer, 0, toWrite);
-                written += toWrite;
-            }
-        }
-        return filePath;
-    }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
new file mode 100644
index 0000000..810e015
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.impl.headers.HeadersMultiMap;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static org.apache.cassandra.sidecar.utils.DigestVerifierFactory.FALLBACK_VERIFIER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link DigestVerifierFactory}
+ */
+class DigestVerifierFactoryTest
+{
+    MultiMap options;
+    Vertx vertx = Vertx.vertx();
+
+    @BeforeEach
+    void setup()
+    {
+        options = new HeadersMultiMap();
+    }
+
+
+    @Test
+    void testEmptyFactoryReturnsFallbackVerifier()
+    {
+        DigestVerifier verifier = new DigestVerifierFactory(vertx).verifier(options);
+        assertThat(verifier).as("should fallback to the fallback verifier when no verifiers are configured")
+                            .isNotNull()
+                            .isSameAs(FALLBACK_VERIFIER);
+    }
+
+    @Test
+    void testMd5Verifier()
+    {
+        options.set("content-md5", "md5-header");
+        DigestVerifier verifier = new DigestVerifierFactory(vertx).verifier(options);
+
+        assertThat(verifier).as("MD5DigestVerifier can verify MD5 content headers")
+                            .isNotNull()
+                            .isInstanceOf(MD5DigestVerifier.class);
+    }
+
+    @Test
+    void testXXHashVerifier()
+    {
+        options.set(CONTENT_XXHASH32, "xxhash-header");
+        DigestVerifier verifier = new DigestVerifierFactory(vertx).verifier(options);
+
+        assertThat(verifier).as("XXHashDigestVerifier can verify XXHash content headers")
+                            .isNotNull()
+                            .isInstanceOf(XXHash32DigestVerifier.class);
+    }
+
+    @Test
+    void testFirstVerifierTakesPrecedence()
+    {
+        options.set("content-md5", "md5-header")
+               .set(CONTENT_XXHASH32, "xxhash-header");
+        DigestVerifier verifier = new DigestVerifierFactory(vertx).verifier(options);
+
+        assertThat(verifier).as("XXHashDigestVerifier is selected when both headers are present")
+                            .isNotNull()
+                            .isInstanceOf(XXHash32DigestVerifier.class);
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
deleted file mode 100644
index 8eb5680..0000000
--- a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.utils;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Path;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Base64;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import io.vertx.core.Future;
-import io.vertx.core.Vertx;
-import io.vertx.core.file.AsyncFile;
-import io.vertx.core.file.FileSystem;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/**
- * Unit tests for {@link MD5ChecksumVerifier}
- */
-class MD5ChecksumVerifierTest
-{
-    static Vertx vertx;
-    static ExposeAsyncFileMD5ChecksumVerifier verifier;
-
-    @TempDir
-    Path tempDir;
-
-    @BeforeAll
-    static void setup()
-    {
-        vertx = Vertx.vertx();
-        verifier = new ExposeAsyncFileMD5ChecksumVerifier(vertx.fileSystem());
-    }
-
-    @Test
-    void testFileDescriptorsClosedWithValidChecksum() throws IOException, NoSuchAlgorithmException,
-                                                             InterruptedException
-    {
-        byte[] randomBytes = generateRandomBytes();
-        Path randomFilePath = writeBytesToRandomFile(randomBytes);
-        String expectedChecksum = Base64.getEncoder()
-                                        .encodeToString(MessageDigest.getInstance("MD5")
-                                                                     .digest(randomBytes));
-
-        runTestScenario(randomFilePath, expectedChecksum);
-    }
-
-    @Test
-    void testFileDescriptorsClosedWithInvalidChecksum() throws IOException, InterruptedException
-    {
-        Path randomFilePath = writeBytesToRandomFile(generateRandomBytes());
-        runTestScenario(randomFilePath, "invalid");
-    }
-
-    private void runTestScenario(Path filePath, String checksum) throws InterruptedException
-    {
-        CountDownLatch latch = new CountDownLatch(1);
-        verifier.verify(checksum, filePath.toAbsolutePath().toString())
-                .onComplete(complete -> latch.countDown());
-
-        assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
-
-        assertThat(verifier.file).isNotNull();
-        // we can't close the file if it's already closed, so we expect the exception here
-        assertThatThrownBy(() -> verifier.file.end())
-        .isInstanceOf(IllegalStateException.class)
-        .hasMessageContaining("File handle is closed");
-    }
-
-    private byte[] generateRandomBytes()
-    {
-        byte[] bytes = new byte[1024];
-        ThreadLocalRandom.current().nextBytes(bytes);
-        return bytes;
-    }
-
-    private Path writeBytesToRandomFile(byte[] bytes) throws IOException
-    {
-        Path tempPath = tempDir.resolve("random-file.txt");
-        try (RandomAccessFile writer = new RandomAccessFile(tempPath.toFile(), "rw"))
-        {
-            writer.write(bytes);
-        }
-        return tempPath;
-    }
-
-    /**
-     * Class that extends from {@link MD5ChecksumVerifier} for testing purposes and holds a reference to the
-     * {@link AsyncFile} to ensure that the file has been closed.
-     */
-    static class ExposeAsyncFileMD5ChecksumVerifier extends MD5ChecksumVerifier
-    {
-        AsyncFile file;
-
-        public ExposeAsyncFileMD5ChecksumVerifier(FileSystem fs)
-        {
-            super(fs);
-        }
-
-        @Override
-        Future<String> calculateMD5(AsyncFile file)
-        {
-            this.file = file;
-            return super.calculateMD5(file);
-        }
-    }
-}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
new file mode 100644
index 0000000..550ef8e
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link MD5DigestVerifier}
+ */
+class MD5DigestVerifierTest
+{
+    static Vertx vertx;
+
+    @TempDir
+    Path tempDir;
+
+    @BeforeAll
+    static void setup()
+    {
+        vertx = Vertx.vertx();
+    }
+
+    @Test
+    void testFileDescriptorsClosedWithValidDigest() throws IOException, NoSuchAlgorithmException,
+                                                           InterruptedException
+    {
+        Path randomFilePath = TestFileUtils.prepareTestFile(tempDir, "random-file.txt", 1024);
+        byte[] randomBytes = Files.readAllBytes(randomFilePath);
+        String expectedDigest = Base64.getEncoder()
+                                      .encodeToString(MessageDigest.getInstance("MD5")
+                                                                   .digest(randomBytes));
+
+        runTestScenario(randomFilePath, expectedDigest);
+    }
+
+    @Test
+    void testFileDescriptorsClosedWithInvalidDigest() throws IOException, InterruptedException
+    {
+        Path randomFilePath = TestFileUtils.prepareTestFile(tempDir, "random-file.txt", 1024);
+        runTestScenario(randomFilePath, "invalid");
+    }
+
+    private void runTestScenario(Path filePath, String digest) throws InterruptedException
+    {
+        CountDownLatch latch = new CountDownLatch(1);
+        ExposeAsyncFileMD5DigestVerifier verifier = newVerifier(new MD5Digest(digest));
+        verifier.verify(filePath.toAbsolutePath().toString())
+                .onComplete(complete -> latch.countDown());
+
+        assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
+
+        assertThat(verifier.file).isNotNull();
+        // we can't close the file if it's already closed, so we expect the exception here
+        assertThatThrownBy(() -> verifier.file.end())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("File handle is closed");
+    }
+
+    static ExposeAsyncFileMD5DigestVerifier newVerifier(MD5Digest digest)
+    {
+        return new ExposeAsyncFileMD5DigestVerifier(vertx.fileSystem(), digest);
+    }
+
+    /**
+     * Class that extends from {@link MD5DigestVerifier} for testing purposes and holds a reference to the
+     * {@link AsyncFile} to ensure that the file has been closed.
+     */
+    static class ExposeAsyncFileMD5DigestVerifier extends MD5DigestVerifier
+    {
+        AsyncFile file;
+
+        public ExposeAsyncFileMD5DigestVerifier(FileSystem fs, MD5Digest md5Digest)
+        {
+            super(fs, md5Digest);
+        }
+
+        @Override
+        protected Future<String> calculateDigest(AsyncFile file)
+        {
+            this.file = file;
+            return super.calculateDigest(file);
+        }
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java b/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java
new file mode 100644
index 0000000..195ef34
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * File utilities for tests
+ */
+public class TestFileUtils
+{
+    /**
+     * Writes random data to a file with name {@code filename} under the specified {@code directory} with
+     * the specified size in bytes.
+     *
+     * @param directory   the directory where to
+     * @param fileName    the name of the desired file to create
+     * @param sizeInBytes the size of the files in bytes
+     * @return the path of the file that was recently created
+     * @throws IOException when file creation or writing to the file fails
+     */
+    public static Path prepareTestFile(Path directory, String fileName, long sizeInBytes) throws IOException
+    {
+        Path filePath = directory.resolve(fileName);
+        Files.deleteIfExists(filePath);
+
+        byte[] buffer = new byte[1024];
+        try (OutputStream outputStream = Files.newOutputStream(filePath))
+        {
+            int written = 0;
+            while (written < sizeInBytes)
+            {
+                ThreadLocalRandom.current().nextBytes(buffer);
+                int toWrite = (int) Math.min(buffer.length, sizeInBytes - written);
+                outputStream.write(buffer, 0, toWrite);
+                written += toWrite;
+            }
+        }
+        return filePath;
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
new file mode 100644
index 0000000..ff7566e
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
+import org.assertj.core.api.InstanceOfAssertFactories;
+
+import static org.apache.cassandra.sidecar.restore.RestoreJobUtil.checksum;
+import static org.assertj.core.api.Assertions.as;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.from;
+
+/**
+ * Unit tests for {@link XXHash32DigestVerifier}
+ */
+class XXHash32DigestVerifierTest
+{
+    static Vertx vertx;
+
+    @TempDir
+    static Path tempDir;
+    static Path randomFilePath;
+
+    @BeforeAll
+    static void setup() throws IOException
+    {
+        vertx = Vertx.vertx();
+
+        randomFilePath = TestFileUtils.prepareTestFile(tempDir, "random-file.txt", 1024);
+    }
+
+    @Test
+    void failsWhenDigestIsNull()
+    {
+        assertThatNullPointerException().isThrownBy(() -> newVerifier(null)).withMessage("digest is required");
+    }
+
+    @Test
+    void testFileDescriptorsClosedWithValidDigest() throws IOException, InterruptedException
+    {
+        XXHash32Digest digest = new XXHash32Digest(checksum(randomFilePath.toFile()));
+        runTestScenario(randomFilePath, digest, false);
+    }
+
+    @Test
+    void failsWithNonDefaultSeedAndSeedIsNotPassedAsAnOption() throws IOException, InterruptedException
+    {
+        XXHash32Digest digest = new XXHash32Digest(checksum(randomFilePath.toFile(), 0x55555555));
+        runTestScenario(randomFilePath, digest, true);
+    }
+
+    @Test
+    void testWithCustomSeed() throws IOException, InterruptedException
+    {
+        int seed = 0x55555555;
+        XXHash32Digest digest = new XXHash32Digest(checksum(randomFilePath.toFile(), seed), seed);
+        runTestScenario(randomFilePath, digest, false);
+    }
+
+    @Test
+    void testFileDescriptorsClosedWithInvalidDigest() throws InterruptedException
+    {
+        runTestScenario(randomFilePath, new XXHash32Digest("invalid"), true);
+    }
+
+    private void runTestScenario(Path filePath, XXHash32Digest digest,
+                                 boolean errorExpectedDuringValidation) throws InterruptedException
+    {
+        CountDownLatch latch = new CountDownLatch(1);
+        ExposeAsyncFileXXHash32DigestVerifier verifier = newVerifier(digest);
+        verifier.verify(filePath.toAbsolutePath().toString())
+                .onComplete(complete -> {
+                    if (errorExpectedDuringValidation)
+                    {
+                        assertThat(complete.failed()).isTrue();
+                        assertThat(complete.cause())
+                        .isInstanceOf(HttpException.class)
+                        .extracting(from(t -> ((HttpException) t).getPayload()), as(InstanceOfAssertFactories.STRING))
+                        .contains("Digest mismatch. expected_digest=" + digest.value());
+                    }
+                    else
+                    {
+                        assertThat(complete.failed()).isFalse();
+                        assertThat(complete.result()).endsWith("random-file.txt");
+                    }
+                    latch.countDown();
+                });
+
+        assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
+
+        assertThat(verifier.file).isNotNull();
+        // we can't close the file if it's already closed, so we expect the exception here
+        assertThatThrownBy(() -> verifier.file.end())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("File handle is closed");
+    }
+
+    static ExposeAsyncFileXXHash32DigestVerifier newVerifier(XXHash32Digest digest)
+    {
+        return new ExposeAsyncFileXXHash32DigestVerifier(vertx.fileSystem(), digest);
+    }
+
+    /**
+     * Class that extends from {@link XXHash32DigestVerifier} for testing purposes and holds a reference to the
+     * {@link AsyncFile} to ensure that the file has been closed.
+     */
+    static class ExposeAsyncFileXXHash32DigestVerifier extends XXHash32DigestVerifier
+    {
+        AsyncFile file;
+
+        public ExposeAsyncFileXXHash32DigestVerifier(FileSystem fs, XXHash32Digest digest)
+        {
+            super(fs, digest);
+        }
+
+        @Override
+        protected Future<String> calculateDigest(AsyncFile file)
+        {
+            this.file = file;
+            return super.calculateDigest(file);
+        }
+    }
+}