CASSANDRASC-97: Add support for additional digest validation during SSTable upload
In this commit we add the ability to support additional digest algorithms for verification
during SSTable uploads. We introduce the `DigestVerifierFactory` which now supports
XXHash32 and MD5 `DigestVerifier`s.
This commit also adds support for XXHash32 digests. Clients can now send the XXHash32 digest
instead of MD5. This would allow both the clients and server the flexibility to utilize a more
performant algorithm.
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-97
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b5a9b9..3f1b4cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Add support for additional digest validation during SSTable upload (CASSANDRASC-97)
* Add sidecar client changes for restore from S3 (CASSANDRASC-95)
* Add restore SSTables from S3 into Cassandra feature to Cassandra Sidecar (CASSANDRASC-92)
* Define routing order for http routes (CASSANDRASC-93)
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 9433e3a..7af57e9 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -44,6 +44,7 @@
import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
import org.apache.cassandra.sidecar.client.selection.SingleInstanceSelectionPolicy;
+import org.apache.cassandra.sidecar.common.data.Digest;
import org.apache.cassandra.sidecar.common.utils.HttpRange;
import org.jetbrains.annotations.Nullable;
@@ -449,14 +450,14 @@
* @param tableName the table name in Cassandra
* @param uploadId an identifier for the upload
* @param component SSTable component being uploaded
- * @param checksum hash value to check integrity of SSTable component uploaded
+ * @param digest digest value to check integrity of SSTable component uploaded
* @param filename the path to the file to be uploaded
* @return a reference to this Builder
*/
public Builder uploadSSTableRequest(String keyspace, String tableName, String uploadId, String component,
- String checksum, String filename)
+ Digest digest, String filename)
{
- return request(new UploadSSTableRequest(keyspace, tableName, uploadId, component, checksum, filename));
+ return request(new UploadSSTableRequest(keyspace, tableName, uploadId, component, digest, filename));
}
/**
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 3048808..42b7093 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -45,6 +45,7 @@
import org.apache.cassandra.sidecar.common.data.CreateRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.data.CreateRestoreJobResponsePayload;
import org.apache.cassandra.sidecar.common.data.CreateSliceRequestPayload;
+import org.apache.cassandra.sidecar.common.data.Digest;
import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.data.HealthResponse;
import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
@@ -391,7 +392,7 @@
* @param table the table name in Cassandra
* @param uploadId the unique identifier for the upload
* @param componentName the name of the SSTable component
- * @param checksum hash value to check integrity of SSTable component uploaded
+ * @param digest digest value to check integrity of SSTable component uploaded
* @param filename the path to the file to be uploaded
* @return a completable future for the request
*/
@@ -400,7 +401,7 @@
String table,
String uploadId,
String componentName,
- String checksum,
+ Digest digest,
String filename)
{
return executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
@@ -408,7 +409,7 @@
table,
uploadId,
componentName,
- checksum,
+ digest,
filename)
.build());
}
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java b/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
index ea0fe98..b93c82a 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/request/UploadSSTableRequest.java
@@ -25,16 +25,16 @@
import java.util.Map;
import java.util.Objects;
-import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.Digest;
/**
* Represents a request to upload an SSTable component
*/
public class UploadSSTableRequest extends Request implements UploadableRequest
{
- private final String expectedChecksum;
+ private final Digest digest;
private final String filename;
/**
@@ -44,14 +44,14 @@
* @param table the table name in Cassandra
* @param uploadId an identifier for the upload
* @param component SSTable component being uploaded
- * @param checksum hash value to check integrity of SSTable component uploaded
+ * @param digest digest value to check integrity of SSTable component uploaded
* @param filename the path to the file to be uploaded
*/
public UploadSSTableRequest(String keyspace, String table, String uploadId, String component,
- String checksum, String filename)
+ Digest digest, String filename)
{
super(requestURI(keyspace, table, uploadId, component));
- this.expectedChecksum = checksum;
+ this.digest = digest;
this.filename = Objects.requireNonNull(filename, "the filename is must be non-null");
if (!Files.exists(Paths.get(filename)))
@@ -72,12 +72,12 @@
@Override
public Map<String, String> headers()
{
- if (expectedChecksum == null)
+ if (digest == null)
{
return super.headers();
}
Map<String, String> headers = new HashMap<>(super.headers());
- headers.put(HttpHeaderNames.CONTENT_MD5.toString(), expectedChecksum);
+ headers.putAll(digest.headers());
return Collections.unmodifiableMap(headers);
}
diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index fb1102c..e06b709 100644
--- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -63,6 +63,7 @@
import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.data.HealthResponse;
import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
import org.apache.cassandra.sidecar.common.data.RingEntry;
import org.apache.cassandra.sidecar.common.data.RingResponse;
@@ -70,6 +71,7 @@
import org.apache.cassandra.sidecar.common.data.SchemaResponse;
import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
import org.apache.cassandra.sidecar.common.utils.HttpRange;
import org.apache.cassandra.sidecar.foundation.RestoreJobSecretsGen;
@@ -78,6 +80,8 @@
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpResponseStatus.PARTIAL_CONTENT;
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatException;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -689,7 +693,7 @@
}
@Test
- void testUploadSSTableWithoutChecksum(@TempDir Path tempDirectory) throws Exception
+ void testUploadSSTableWithoutDigest(@TempDir Path tempDirectory) throws Exception
{
Path fileToUpload = prepareFile(tempDirectory);
try (MockWebServer server = new MockWebServer())
@@ -722,7 +726,7 @@
}
@Test
- void testUploadSSTableWithChecksum(@TempDir Path tempDirectory) throws Exception
+ void testUploadSSTableWithMD5Digest(@TempDir Path tempDirectory) throws Exception
{
Path fileToUpload = prepareFile(tempDirectory);
try (MockWebServer server = new MockWebServer())
@@ -735,7 +739,7 @@
"cyclist_name",
"0000-0000",
"nb-1-big-TOC.txt",
- "15a69dc6501aa5ae17af037fe053f610",
+ new MD5Digest("15a69dc6501aa5ae17af037fe053f610"),
fileToUpload.toString())
.get(30, TimeUnit.SECONDS);
@@ -756,6 +760,76 @@
}
@Test
+ void testUploadSSTableWithXXHashDigest(@TempDir Path tempDirectory) throws Exception
+ {
+ Path fileToUpload = prepareFile(tempDirectory);
+ try (MockWebServer server = new MockWebServer())
+ {
+ server.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+ SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server);
+ client.uploadSSTableRequest(sidecarInstance,
+ "cycling",
+ "cyclist_name",
+ "0000-0000",
+ "nb-1-big-TOC.txt",
+ new XXHash32Digest("15a69dc6501aa5ae17af037fe053f610"),
+ fileToUpload.toString())
+ .get(30, TimeUnit.SECONDS);
+
+ assertThat(server.getRequestCount()).isEqualTo(1);
+ RecordedRequest request = server.takeRequest();
+ assertThat(request.getPath())
+ .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE
+ .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000")
+ .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "cycling")
+ .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name")
+ .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt"));
+ assertThat(request.getMethod()).isEqualTo("PUT");
+ assertThat(request.getHeader(CONTENT_XXHASH32))
+ .isEqualTo("15a69dc6501aa5ae17af037fe053f610");
+ assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isNull();
+ assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80");
+ assertThat(request.getBodySize()).isEqualTo(80);
+ }
+ }
+
+ @Test
+ void testUploadSSTableWithXXHashDigestAndSeed(@TempDir Path tempDirectory) throws Exception
+ {
+ Path fileToUpload = prepareFile(tempDirectory);
+ try (MockWebServer server = new MockWebServer())
+ {
+ server.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+ SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server);
+ client.uploadSSTableRequest(sidecarInstance,
+ "cycling",
+ "cyclist_name",
+ "0000-0000",
+ "nb-1-big-TOC.txt",
+ new XXHash32Digest("15a69dc6501aa5ae17af037fe053f610", "123456"),
+ fileToUpload.toString())
+ .get(30, TimeUnit.SECONDS);
+
+ assertThat(server.getRequestCount()).isEqualTo(1);
+ RecordedRequest request = server.takeRequest();
+ assertThat(request.getPath())
+ .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE
+ .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000")
+ .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "cycling")
+ .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name")
+ .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt"));
+ assertThat(request.getMethod()).isEqualTo("PUT");
+ assertThat(request.getHeader(CONTENT_XXHASH32))
+ .isEqualTo("15a69dc6501aa5ae17af037fe053f610");
+ assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isEqualTo("123456");
+ assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80");
+ assertThat(request.getBodySize()).isEqualTo(80);
+ }
+ }
+
+ @Test
void testStreamSSTableComponentWithNoRange() throws Exception
{
try (MockWebServer server = new MockWebServer())
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
new file mode 100644
index 0000000..1d4f432
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/Digest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.Map;
+
+/**
+ * Interface that represents a checksum digest
+ */
+public interface Digest
+{
+ /**
+ * @return headers to be used in the HTTP request
+ */
+ Map<String, String> headers();
+
+ /**
+ * @return the string representation of the digest
+ */
+ String value();
+
+ /**
+ * @return the name of the digest's algorithm
+ */
+ String algorithm();
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java
new file mode 100644
index 0000000..ba8b7b5
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/MD5Digest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implements the MD5 checksum digest
+ */
+public class MD5Digest implements Digest
+{
+ private final @NotNull String value;
+
+ /**
+ * Constructs a new MD5Digest with the provided MD5 {@code value}
+ *
+ * @param value the MD5 value
+ */
+ public MD5Digest(@NotNull String value)
+ {
+ this.value = Objects.requireNonNull(value, "value is required");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String value()
+ {
+ return value;
+ }
+
+ @Override
+ public String algorithm()
+ {
+ return "MD5";
+ }
+
+ /**
+ * @return MD5 headers for the Digest
+ */
+ @Override
+ public Map<String, String> headers()
+ {
+ return Collections.singletonMap(HttpHeaderNames.CONTENT_MD5.toString(), value);
+ }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
new file mode 100644
index 0000000..48127e4
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/XXHash32Digest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
+
+/**
+ * Implements the XXHash32 Digest
+ */
+public class XXHash32Digest implements Digest
+{
+ private final @NotNull String value;
+ private final @Nullable String seedHex;
+
+ /**
+ * Constructs a new XXHashDigest with the provided XXHash {@code value}
+ *
+ * @param value the xxhash value
+ */
+ public XXHash32Digest(String value)
+ {
+ this(value, null);
+ }
+
+ /**
+ * Constructs a new instance with the provided XXHash {@code value} and the {@code seed} value.
+ *
+ * @param value the xxhash value
+ * @param seed the seed
+ */
+ public XXHash32Digest(String value, int seed)
+ {
+ this(value, Integer.toHexString(seed));
+ }
+
+ /**
+ * Constructs a new XXHashDigest with the provided XXHash {@code value} and the seed value represented as
+ * a hexadecimal string
+ *
+ * @param value the xxhash value
+ * @param seedHex the value of the seed represented as a hexadecimal value
+ */
+ public XXHash32Digest(@NotNull String value, @Nullable String seedHex)
+ {
+ this.value = Objects.requireNonNull(value, "value is required");
+ this.seedHex = seedHex;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String value()
+ {
+ return value;
+ }
+
+ /**
+ * @return the optional seed in hexadecimal format
+ */
+ public @Nullable String seedHex()
+ {
+ return seedHex;
+ }
+
+ @Override
+ public String algorithm()
+ {
+ return "XXHash32";
+ }
+
+ /**
+ * @return XXHash headers for the digest
+ */
+ @Override
+ public Map<String, String> headers()
+ {
+ Map<String, String> headers = new HashMap<>();
+ headers.put(CONTENT_XXHASH32, value);
+ if (seedHex != null)
+ {
+ headers.put(CONTENT_XXHASH32_SEED, seedHex);
+ }
+ return headers;
+ }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java b/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
new file mode 100644
index 0000000..347ed59
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/http/SidecarHttpHeaderNames.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.http;
+
+/**
+ * Custom header names for sidecar
+ */
+public final class SidecarHttpHeaderNames
+{
+ /**
+ * {@code "cassandra-content-xxhash32"}
+ */
+ public static final String CONTENT_XXHASH32 = "cassandra-content-xxhash32";
+ /**
+ * {@code "cassandra-content-xxhash32-seed"}
+ */
+ public static final String CONTENT_XXHASH32_SEED = "cassandra-content-xxhash32-seed";
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java b/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
index cef1491..60cfbd2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
+++ b/src/main/java/org/apache/cassandra/sidecar/data/SSTableUploadRequest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.sidecar.data;
-import io.netty.handler.codec.http.HttpHeaderNames;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
import org.apache.cassandra.sidecar.common.data.SSTableUploads;
@@ -29,7 +28,6 @@
public class SSTableUploadRequest extends SSTableUploads
{
private final String component;
- private final String expectedChecksum;
/**
* Constructs an SSTableUploadRequest
@@ -37,16 +35,13 @@
* @param qualifiedTableName the qualified table name in Cassandra
* @param uploadId an identifier for the upload
* @param component SSTable component being uploaded
- * @param expectedChecksum expected hash value to check integrity of SSTable component uploaded
*/
public SSTableUploadRequest(QualifiedTableName qualifiedTableName,
String uploadId,
- String component,
- String expectedChecksum)
+ String component)
{
super(qualifiedTableName, uploadId);
this.component = component;
- this.expectedChecksum = expectedChecksum;
}
/**
@@ -58,14 +53,6 @@
}
/**
- * @return expected checksum value of SSTable component
- */
- public String expectedChecksum()
- {
- return this.expectedChecksum;
- }
-
- /**
* {@inheritDoc}
*/
public String toString()
@@ -75,7 +62,6 @@
", keyspace='" + keyspace() + '\'' +
", tableName='" + table() + '\'' +
", component='" + component + '\'' +
- ", expectedChecksum='" + expectedChecksum + '\'' +
'}';
}
@@ -90,7 +76,6 @@
{
return new SSTableUploadRequest(qualifiedTableName,
context.pathParam("uploadId"),
- context.pathParam("component"),
- context.request().getHeader(HttpHeaderNames.CONTENT_MD5.toString()));
+ context.pathParam("component"));
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 918d8c0..8828c8f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -115,11 +115,21 @@
*/
public static String checksum(File file) throws IOException
{
+ int seed = 0x9747b28c; // random seed for initializing
+ return checksum(file, seed);
+ }
+
+ /**
+ * @param file the file to use to perform the checksum
+ * @param seed the seed to use for the hasher
+ * @return the checksum hex string of the file's content. XXHash32 is employed as the hash algorithm.
+ */
+ public static String checksum(File file, int seed) throws IOException
+ {
try (FileInputStream fis = new FileInputStream(file))
{
// might have shared hashers with ThreadLocal
XXHashFactory factory = XXHashFactory.safeInstance();
- int seed = 0x9747b28c; // random seed for initializing
try (StreamingXXHash32 hasher = factory.newStreamingHash32(seed))
{
byte[] buffer = new byte[KB_512];
@@ -128,7 +138,7 @@
{
hasher.update(buffer, 0, len);
}
- return Long.toHexString(hasher.getValue());
+ return Long.toHexString(hasher.getValue());
}
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
index dc23141..9347f7f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
@@ -42,6 +42,8 @@
import org.apache.cassandra.sidecar.stats.SSTableStats;
import org.apache.cassandra.sidecar.stats.SidecarStats;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.DigestVerifier;
+import org.apache.cassandra.sidecar.utils.DigestVerifierFactory;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.utils.MetadataUtils;
import org.apache.cassandra.sidecar.utils.SSTableUploader;
@@ -62,6 +64,7 @@
private final SSTableUploadsPathBuilder uploadPathBuilder;
private final ConcurrencyLimiter limiter;
private final SSTableStats stats;
+ private final DigestVerifierFactory digestVerifierFactory;
/**
* Constructs a handler with the provided params.
@@ -74,6 +77,7 @@
* @param executorPools executor pools for blocking executions
* @param validator a validator instance to validate Cassandra-specific input
* @param sidecarStats an interface holding all stats related to main sidecar process
+ * @param digestVerifierFactory a factory of checksum verifiers
*/
@Inject
protected SSTableUploadHandler(Vertx vertx,
@@ -83,7 +87,8 @@
SSTableUploadsPathBuilder uploadPathBuilder,
ExecutorPools executorPools,
CassandraInputValidator validator,
- SidecarStats sidecarStats)
+ SidecarStats sidecarStats,
+ DigestVerifierFactory digestVerifierFactory)
{
super(metadataFetcher, executorPools, validator);
this.fs = vertx.fileSystem();
@@ -92,6 +97,7 @@
this.uploadPathBuilder = uploadPathBuilder;
this.limiter = new ConcurrencyLimiter(configuration::concurrentUploadsLimit);
this.stats = sidecarStats.ssTableStats();
+ this.digestVerifierFactory = digestVerifierFactory;
}
/**
@@ -124,11 +130,14 @@
.compose(validRequest -> uploadPathBuilder.resolveStagingDirectory(host))
.compose(this::ensureSufficientSpaceAvailable)
.compose(v -> uploadPathBuilder.build(host, request))
- .compose(uploadDirectory -> uploader.uploadComponent(httpRequest,
- uploadDirectory,
- request.component(),
- request.expectedChecksum(),
- configuration.filePermissions()))
+ .compose(uploadDirectory -> {
+ DigestVerifier digestVerifier = digestVerifierFactory.verifier(httpRequest.headers());
+ return uploader.uploadComponent(httpRequest,
+ uploadDirectory,
+ request.component(),
+ digestVerifier,
+ configuration.filePermissions());
+ })
.compose(fs::props)
.onSuccess(fileProps -> {
long serviceTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index b1af4b7..31ab7b1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -96,8 +96,6 @@
import org.apache.cassandra.sidecar.stats.SidecarSchemaStats;
import org.apache.cassandra.sidecar.stats.SidecarStats;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
-import org.apache.cassandra.sidecar.utils.ChecksumVerifier;
-import org.apache.cassandra.sidecar.utils.MD5ChecksumVerifier;
import org.apache.cassandra.sidecar.utils.TimeProvider;
import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
@@ -430,13 +428,6 @@
@Provides
@Singleton
- public ChecksumVerifier checksumVerifier(Vertx vertx)
- {
- return new MD5ChecksumVerifier(vertx.fileSystem());
- }
-
- @Provides
- @Singleton
public SidecarVersionProvider sidecarVersionProvider()
{
return new SidecarVersionProvider("/sidecar.version");
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
new file mode 100644
index 0000000..790b54c
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.Digest;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH;
+
+/**
+ * Provides basic functionality to perform digest validations using {@link AsyncFile}
+ *
+ * @param <D> the Digest type
+ */
+public abstract class AsyncFileDigestVerifier<D extends Digest> implements DigestVerifier
+{
+ public static final int DEFAULT_READ_BUFFER_SIZE = 512 * 1024; // 512KiB
+ protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+ protected final FileSystem fs;
+ protected final D digest;
+
+ protected AsyncFileDigestVerifier(FileSystem fs, D digest)
+ {
+ this.fs = fs;
+ this.digest = Objects.requireNonNull(digest, "digest is required");
+ }
+
+ /**
+ * @param filePath path to SSTable component
+ * @return a future String with the component path if verification is a success, otherwise a failed future
+ */
+ @Override
+ public Future<String> verify(String filePath)
+ {
+ logger.debug("Validating {}. expected_digest={}", digest.algorithm(), digest.value());
+
+ return fs.open(filePath, new OpenOptions())
+ .compose(this::calculateDigest)
+ .compose(computedDigest -> {
+ if (!computedDigest.equals(digest.value()))
+ {
+ logger.error("Digest mismatch. computed_digest={}, expected_digest={}, algorithm=MD5",
+ computedDigest, digest.value());
+ return Future.failedFuture(new HttpException(CHECKSUM_MISMATCH.code(),
+ String.format("Digest mismatch. "
+ + "expected_digest=%s, "
+ + "algorithm=%s",
+ digest.value(),
+ digest.algorithm())));
+ }
+ return Future.succeededFuture(filePath);
+ });
+ }
+
+ /**
+ * Returns a future with the calculated digest for the provided {@link AsyncFile file}.
+ *
+ * @param asyncFile the async file to use for digest calculation
+ * @return a future with the computed digest for the provided {@link AsyncFile file}
+ */
+ protected abstract Future<String> calculateDigest(AsyncFile asyncFile);
+
+ protected void readFile(AsyncFile file, Promise<String> result, Handler<Buffer> onBufferAvailable,
+ Handler<Void> onReadComplete)
+ {
+ // Make sure to close the file when complete
+ result.future().onComplete(ignored -> file.end());
+ file.pause()
+ .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE)
+ .handler(onBufferAvailable)
+ .endHandler(onReadComplete)
+ .exceptionHandler(cause -> {
+ logger.error("Error while calculating the {} digest", digest.algorithm(), cause);
+ result.fail(cause);
+ })
+ .resume();
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
similarity index 64%
rename from src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
rename to src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
index e338642..07bb741 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/ChecksumVerifier.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifier.java
@@ -23,16 +23,15 @@
/**
* Interface to verify integrity of SSTables uploaded.
* <p>
- * Note: If checksum calculations of multiple files are happening at the same time, we would want to limit concurrent
- * checksum calculations. Since {@link ChecksumVerifier} is currently used only by upload handler, we are not
- * introducing another limit here. Concurrent uploads limit should limit concurrent checksum calculations as well.
+ * Note: If digest calculations of multiple files are happening at the same time, we would want to limit concurrent
+ * digest calculations. Since {@link DigestVerifier} is currently used only by upload handler, we are not
+ * introducing another limit here. Concurrent uploads limit should limit concurrent digest calculations as well.
*/
-public interface ChecksumVerifier
+public interface DigestVerifier
{
/**
- * @param checksum expected checksum value
- * @param filePath path to SSTable component
- * @return String component path, if verification is a success, else a failed future is returned
+ * @param filePath path to SSTable component
+ * @return a future String with the component path if verification is a success, otherwise a failed future
*/
- Future<String> verify(String checksum, String filePath);
+ Future<String> verify(String filePath);
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java
new file mode 100644
index 0000000..3dd64a2
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.FileSystem;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+
+/**
+ * A factory class that returns the {@link DigestVerifier} instance.
+ */
+@Singleton
+public class DigestVerifierFactory
+{
+ @VisibleForTesting
+ static final DigestVerifier FALLBACK_VERIFIER = Future::succeededFuture;
+ private final FileSystem fs;
+
+
+ /**
+ * Constructs a new factory
+ *
+ * @param vertx the vertx instance
+ */
+ @Inject
+ public DigestVerifierFactory(Vertx vertx)
+ {
+ this.fs = vertx.fileSystem();
+ }
+
+ /***
+ * Returns the first match for a {@link DigestVerifier} from the registered list of verifiers. If none of the
+ * verifiers matches, a no-op validator is returned.
+ *
+ * @param headers the request headers used to test whether a {@link DigestVerifier} can be used to verify the
+ * request
+ * @return the first match for a {@link DigestVerifier} from the registered list of verifiers, or a no-op
+ * verifier if none match
+ */
+ public DigestVerifier verifier(MultiMap headers)
+ {
+ if (headers.contains(CONTENT_XXHASH32))
+ {
+ return XXHash32DigestVerifier.create(fs, headers);
+ }
+ else if (headers.contains(HttpHeaderNames.CONTENT_MD5.toString()))
+ {
+ return MD5DigestVerifier.create(fs, headers);
+ }
+ // Fallback to no-op validator
+ return FALLBACK_VERIFIER;
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
deleted file mode 100644
index 132ff10..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.utils;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Base64;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.vertx.core.Future;
-import io.vertx.core.Promise;
-import io.vertx.core.file.AsyncFile;
-import io.vertx.core.file.FileSystem;
-import io.vertx.core.file.OpenOptions;
-import io.vertx.ext.web.handler.HttpException;
-import org.jetbrains.annotations.VisibleForTesting;
-
-import static org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH;
-
-/**
- * Implementation of {@link ChecksumVerifier}. Here we use MD5 implementation of {@link java.security.MessageDigest}
- * for calculating checksum. And match the calculated checksum with expected checksum obtained from request.
- */
-public class MD5ChecksumVerifier implements ChecksumVerifier
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(MD5ChecksumVerifier.class);
- public static final int DEFAULT_READ_BUFFER_SIZE = 64 * 1024; // 64KiB
- private final FileSystem fs;
-
- public MD5ChecksumVerifier(FileSystem fs)
- {
- this.fs = fs;
- }
-
- public Future<String> verify(String expectedChecksum, String filePath)
- {
- if (expectedChecksum == null)
- {
- return Future.succeededFuture(filePath);
- }
-
- LOGGER.debug("Validating MD5. expected_checksum={}", expectedChecksum);
-
- return fs.open(filePath, new OpenOptions())
- .compose(this::calculateMD5)
- .compose(computedChecksum -> {
- if (!expectedChecksum.equals(computedChecksum))
- {
- LOGGER.error("Checksum mismatch. computed_checksum={}, expected_checksum={}, algorithm=MD5",
- computedChecksum, expectedChecksum);
- return Future.failedFuture(new HttpException(CHECKSUM_MISMATCH.code(),
- String.format("Checksum mismatch. "
- + "expected_checksum=%s, "
- + "algorithm=MD5",
- expectedChecksum)));
- }
- return Future.succeededFuture(filePath);
- });
- }
-
- @VisibleForTesting
- Future<String> calculateMD5(AsyncFile file)
- {
- MessageDigest digest;
- try
- {
- digest = MessageDigest.getInstance("MD5");
- }
- catch (NoSuchAlgorithmException e)
- {
- return Future.failedFuture(e);
- }
-
- Promise<String> result = Promise.promise();
- file.pause()
- .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE)
- .handler(buf -> digest.update(buf.getBytes()))
- .endHandler(_v -> {
- result.complete(Base64.getEncoder().encodeToString(digest.digest()));
- file.end();
- })
- .exceptionHandler(cause -> {
- LOGGER.error("Error while calculating MD5 checksum", cause);
- result.fail(cause);
- file.end();
- })
- .resume();
- return result.future();
- }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java
new file mode 100644
index 0000000..ed4218d
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifier.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Implementation of {@link DigestVerifier}. Here we use MD5 implementation of {@link java.security.MessageDigest}
+ * for calculating digest and match the calculated digest with expected digest obtained from request.
+ */
+public class MD5DigestVerifier extends AsyncFileDigestVerifier<MD5Digest>
+{
+ protected MD5DigestVerifier(FileSystem fs, MD5Digest digest)
+ {
+ super(fs, digest);
+ }
+
+ public static DigestVerifier create(FileSystem fs, MultiMap headers)
+ {
+ MD5Digest md5Digest = new MD5Digest(headers.get(HttpHeaderNames.CONTENT_MD5.toString()));
+ return new MD5DigestVerifier(fs, md5Digest);
+ }
+
+ @Override
+ @VisibleForTesting
+ protected Future<String> calculateDigest(AsyncFile file)
+ {
+ MessageDigest digest;
+ try
+ {
+ digest = MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ return Future.failedFuture(e);
+ }
+
+ Promise<String> result = Promise.promise();
+
+ readFile(file, result, buf -> digest.update(buf.getBytes()),
+ _v -> result.complete(Base64.getEncoder().encodeToString(digest.digest())));
+
+ return result.future();
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
index 465f0a4..e7f59a4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
@@ -39,6 +39,7 @@
import io.vertx.core.file.OpenOptions;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
+import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
/**
* A class that handles SSTable Uploads
@@ -50,23 +51,19 @@
private static final String DEFAULT_TEMP_SUFFIX = ".tmp";
private final FileSystem fs;
- private final ChecksumVerifier checksumVerifier;
private final SidecarRateLimiter rateLimiter;
/**
* Constructs an instance of {@link SSTableUploader} with provided params for uploading an SSTable component.
*
- * @param vertx Vertx reference
- * @param checksumVerifier verifier for checking integrity of upload
- * @param rateLimiter rate limiter for uploading SSTable components
+ * @param vertx Vertx reference
+ * @param rateLimiter rate limiter for uploading SSTable components
*/
@Inject
public SSTableUploader(Vertx vertx,
- ChecksumVerifier checksumVerifier,
@Named("IngressFileRateLimiter") SidecarRateLimiter rateLimiter)
{
this.fs = vertx.fileSystem();
- this.checksumVerifier = checksumVerifier;
this.rateLimiter = rateLimiter;
}
@@ -76,14 +73,14 @@
* @param readStream server request from which file upload is acquired
* @param uploadDirectory the absolute path to the upload directory in the target {@code fs}
* @param componentFileName the file name of the component
- * @param expectedChecksum for verifying upload integrity, passed in through request
+ * @param digestVerifier the digest verifier instance
* @param filePermissions specifies the posix file permissions used to create the SSTable file
* @return path of SSTable component to which data was uploaded
*/
public Future<String> uploadComponent(ReadStream<Buffer> readStream,
String uploadDirectory,
String componentFileName,
- String expectedChecksum,
+ DigestVerifier digestVerifier,
String filePermissions)
{
@@ -92,15 +89,16 @@
return fs.mkdirs(uploadDirectory) // ensure the parent directory is created
.compose(v -> createTempFile(uploadDirectory, componentFileName, filePermissions))
- .compose(tempFilePath -> streamAndVerify(readStream, tempFilePath, expectedChecksum))
+ .compose(tempFilePath -> streamAndVerify(readStream, tempFilePath, digestVerifier))
.compose(verifiedTempFilePath -> moveAtomicallyWithFallBack(verifiedTempFilePath, targetPath));
}
- private Future<String> streamAndVerify(ReadStream<Buffer> readStream, String tempFilePath, String expectedChecksum)
+ private Future<String> streamAndVerify(ReadStream<Buffer> readStream, String tempFilePath,
+ DigestVerifier digestVerifier)
{
// pipe read stream to temp file
return streamToFile(readStream, tempFilePath)
- .compose(v -> checksumVerifier.verify(expectedChecksum, tempFilePath))
+ .compose(v -> digestVerifier.verify(tempFilePath))
.onFailure(throwable -> fs.delete(tempFilePath));
}
@@ -128,7 +126,9 @@
LOGGER.debug("Moving from={} to={}", source, target);
return fs.move(source, target, new CopyOptions().setAtomicMove(true))
.recover(cause -> {
- if (hasCause(cause, AtomicMoveNotSupportedException.class, 10))
+ Exception atomicMoveNotSupportedException =
+ ThrowableUtils.getCause(cause, AtomicMoveNotSupportedException.class);
+ if (atomicMoveNotSupportedException != null)
{
LOGGER.warn("Failed to perform atomic move from={} to={}", source, target, cause);
return fs.move(source, target, new CopyOptions().setAtomicMove(false));
@@ -139,33 +139,6 @@
}
/**
- * Returns true if a cause of type {@code type} is found in the stack trace before exceeding the {@code depth}
- *
- * @param cause the original cause
- * @param type the exception type to test
- * @param depth the maximum depth to check in the stack trace
- * @return true if the exception of type {@code type} exists in the stacktrace, false otherwise
- */
- private static boolean hasCause(Throwable cause, Class<? extends Throwable> type, int depth)
- {
- int i = 0;
- while (i < depth)
- {
- if (cause == null)
- return false;
-
- if (type.isInstance(cause))
- return true;
-
- cause = cause.getCause();
-
- i++;
- }
- return false;
- }
-
-
- /**
* A {@link WriteStream} implementation that supports rate limiting.
*/
public static class RateLimitedWriteStream implements WriteStream<Buffer>
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java
new file mode 100644
index 0000000..b1944c6
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifier.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import net.jpountz.xxhash.StreamingXXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
+
+/**
+ * Implementation of {@link DigestVerifier} to calculate the digest and match the calculated digest
+ * with the expected digest.
+ */
+public class XXHash32DigestVerifier extends AsyncFileDigestVerifier<XXHash32Digest>
+{
+ protected XXHash32DigestVerifier(FileSystem fs, XXHash32Digest digest)
+ {
+ super(fs, digest);
+ }
+
+ public static XXHash32DigestVerifier create(FileSystem fs, MultiMap headers)
+ {
+ XXHash32Digest digest = new XXHash32Digest(headers.get(CONTENT_XXHASH32), headers.get(CONTENT_XXHASH32_SEED));
+ return new XXHash32DigestVerifier(fs, digest);
+ }
+
+ @Override
+ @VisibleForTesting
+ protected Future<String> calculateDigest(AsyncFile file)
+ {
+ Promise<String> result = Promise.promise();
+ Future<String> future = result.future();
+
+ // might have shared hashers with ThreadLocal
+ XXHashFactory factory = XXHashFactory.safeInstance();
+
+ int seed = maybeGetSeedOrDefault();
+ StreamingXXHash32 hasher = factory.newStreamingHash32(seed);
+
+ future.onComplete(ignored -> hasher.close());
+
+ readFile(file, result, buf -> {
+ byte[] bytes = buf.getBytes();
+ hasher.update(bytes, 0, bytes.length);
+ },
+ _v -> result.complete(Long.toHexString(hasher.getValue())));
+
+ return future;
+ }
+
+ protected int maybeGetSeedOrDefault()
+ {
+ String seedHex = digest.seedHex();
+ if (seedHex != null)
+ {
+ return (int) Long.parseLong(seedHex, 16);
+ }
+ return 0x9747b28c; // random seed for initializing
+ }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
index 49554c6..2694fb4 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.sidecar.routes.sstableuploads;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -27,7 +26,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -45,6 +43,9 @@
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.Digest;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
import org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus;
import org.apache.cassandra.sidecar.snapshots.SnapshotUtils;
import org.assertj.core.data.Percentage;
@@ -58,6 +59,7 @@
import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE;
import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static org.apache.cassandra.sidecar.utils.TestFileUtils.prepareTestFile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@@ -75,7 +77,7 @@
void testUploadWithoutMd5_expectSuccessfulUpload(VertxTestContext context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false);
}
@@ -83,15 +85,63 @@
void testUploadWithCorrectMd5_expectSuccessfulUpload(VertxTestContext context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-md5.db", "jXd/OF09/siBXSD3SWAm3A==",
- Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false);
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-md5.db",
+ new MD5Digest("jXd/OF09/siBXSD3SWAm3A=="),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ HttpResponseStatus.OK.code(),
+ false);
+ }
+
+ @Test
+ void testUploadWithCorrectXXHash_expectSuccessfulUpload(VertxTestContext context) throws IOException
+ {
+ UUID uploadId = UUID.randomUUID();
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-xxhash.db",
+ new XXHash32Digest("7a28edc0"),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ HttpResponseStatus.OK.code(),
+ false);
+ }
+
+ @Test
+ void testUploadWithCorrectXXHashAndCustomSeed_expectSuccessfulUpload(VertxTestContext context) throws IOException
+ {
+ UUID uploadId = UUID.randomUUID();
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-xxhash.db",
+ new XXHash32Digest("ffffffffb9510d6b", "55555555"),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ HttpResponseStatus.OK.code(),
+ false);
}
@Test
void testUploadWithIncorrectMd5_expectErrorCode(VertxTestContext context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-md5.db", "incorrectMd5",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-md5.db",
+ new MD5Digest("incorrectMd5"),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
+ false);
+ }
+
+ @Test
+ void testUploadWithIncorrectXXHash_expectErrorCode(VertxTestContext context) throws IOException
+ {
+ UUID uploadId = UUID.randomUUID();
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-xxhash.db",
+ new XXHash32Digest("incorrectXXHash"),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
+ false);
+ }
+
+ @Test
+ void testUploadWithIncorrectXXHashAndCustomSeed_expectErrorCode(VertxTestContext context) throws IOException
+ {
+ UUID uploadId = UUID.randomUUID();
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-xxhash.db",
+ new XXHash32Digest("7a28edc0", "bad"),
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
SidecarHttpResponseStatus.CHECKSUM_MISMATCH.code(),
false);
@@ -101,26 +151,27 @@
void testInvalidFileName_expectErrorCode(VertxTestContext context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "ks$tbl-me-4-big-Data.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "ks$tbl-me-4-big-Data.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
false);
}
@Test
- void testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context) throws IOException
+ void testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context)
{
UUID uploadId = UUID.randomUUID();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-content-length.db",
- "jXd/OF09/siBXSD3SWAm3A==", 0, HttpResponseStatus.OK.code(), false);
+ new MD5Digest("jXd/OF09/siBXSD3SWAm3A=="), 0, HttpResponseStatus.OK.code(), false);
}
@Test
- void testUploadTimeout_expectTimeoutError(VertxTestContext context) throws IOException
+ void testUploadTimeout_expectTimeoutError(VertxTestContext context)
{
// if we send more than actual length, vertx goes hung, probably looking for more data than exists in the file,
// we should see timeout error in this case
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-higher-content-length.db", "", 1000, -1, true);
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-higher-content-length.db", null, 1000, -1,
+ true);
}
@Test
@@ -128,14 +179,14 @@
{
UUID uploadId = UUID.randomUUID();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-lesser-content-length.db",
- "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
+ null, Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
false);
}
@Test
void testInvalidUploadId(VertxTestContext context) throws IOException
{
- sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl", "with-lesser-content-length.db", "",
+ sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl", "with-lesser-content-length.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
false, response -> {
JsonObject error = response.bodyAsJsonObject();
@@ -149,7 +200,7 @@
void testInvalidKeyspace(VertxTestContext context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace", "tbl", "with-lesser-content-length.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace", "tbl", "with-lesser-content-length.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
false);
}
@@ -158,7 +209,7 @@
void testInvalidTable(VertxTestContext context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "invalidTableName", "with-lesser-content-length.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "invalidTableName", "with-lesser-content-length.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
false);
}
@@ -169,7 +220,7 @@
when(mockSSTableUploadConfiguration.minimumSpacePercentageRequired()).thenReturn(100F);
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.INSUFFICIENT_STORAGE.code(), false);
}
@@ -180,7 +231,7 @@
when(mockSSTableUploadConfiguration.concurrentUploadsLimit()).thenReturn(0);
UUID uploadId = UUID.randomUUID();
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.TOO_MANY_REQUESTS.code(), false);
}
@@ -193,13 +244,13 @@
UUID uploadId = UUID.randomUUID();
CountDownLatch latch = new CountDownLatch(1);
sendUploadRequestAndVerify(latch, context, uploadId.toString(), "invalidKeyspace", "tbl",
- "without-md5.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ "without-md5.db", null, Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(), false);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
// checking if permits were released after bad requests
- sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false);
}
@@ -209,7 +260,7 @@
String uploadId = UUID.randomUUID().toString();
when(mockSSTableUploadConfiguration.filePermissions()).thenReturn("rwxr-xr-x");
- sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "without-md5.db", "",
+ sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "without-md5.db", null,
Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(),
false, response -> {
@@ -247,7 +298,7 @@
long startTime = System.nanoTime();
String uploadId = UUID.randomUUID().toString();
- sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "1MB-File-Data.db", "",
+ sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "1MB-File-Data.db", null,
Files.size(largeFilePath), HttpResponseStatus.OK.code(),
false, response -> {
@@ -269,7 +320,7 @@
long startTime = System.nanoTime();
String uploadId = UUID.randomUUID().toString();
- sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "1MB-File-Data.db", "",
+ sendUploadRequestAndVerify(null, context, uploadId, "ks", "tbl", "1MB-File-Data.db", null,
Files.size(largeFilePath), HttpResponseStatus.OK.code(),
false, response -> {
@@ -286,13 +337,13 @@
String keyspace,
String tableName,
String targetFileName,
- String expectedMd5,
+ Digest expectedDigest,
long fileLength,
int expectedRetCode,
boolean expectTimeout)
{
sendUploadRequestAndVerify(null, context, uploadId.toString(), keyspace, tableName, targetFileName,
- expectedMd5, fileLength, expectedRetCode, expectTimeout);
+ expectedDigest, fileLength, expectedRetCode, expectTimeout);
}
private void sendUploadRequestAndVerify(CountDownLatch latch,
@@ -301,7 +352,7 @@
String keyspace,
String tableName,
String targetFileName,
- String expectedMd5,
+ Digest expectedDigest,
long fileLength,
int expectedRetCode,
boolean expectTimeout)
@@ -312,7 +363,7 @@
keyspace,
tableName,
targetFileName,
- expectedMd5,
+ expectedDigest,
fileLength,
expectedRetCode,
expectTimeout,
@@ -326,7 +377,7 @@
String keyspace,
String tableName,
String targetFileName,
- String expectedMd5,
+ Digest expectedDigest,
long fileLength,
int expectedRetCode,
boolean expectTimeout,
@@ -337,9 +388,9 @@
String testRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" + keyspace
+ "/tables/" + tableName + "/components/" + targetFileName;
HttpRequest<Buffer> req = client.put(server.actualPort(), "localhost", testRoute);
- if (!expectedMd5.isEmpty())
+ if (expectedDigest != null)
{
- req.putHeader(HttpHeaderNames.CONTENT_MD5.toString(), expectedMd5);
+ req.headers().addAll(expectedDigest.headers());
}
if (fileLength != 0)
{
@@ -383,24 +434,4 @@
client.close();
});
}
-
- static Path prepareTestFile(Path directory, String fileName, long sizeInBytes) throws IOException
- {
- Path filePath = directory.resolve(fileName);
- Files.deleteIfExists(filePath);
-
- byte[] buffer = new byte[1024];
- try (OutputStream outputStream = Files.newOutputStream(filePath))
- {
- int written = 0;
- while (written < sizeInBytes)
- {
- ThreadLocalRandom.current().nextBytes(buffer);
- int toWrite = (int) Math.min(buffer.length, sizeInBytes - written);
- outputStream.write(buffer, 0, toWrite);
- written += toWrite;
- }
- }
- return filePath;
- }
}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
new file mode 100644
index 0000000..810e015
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/DigestVerifierFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.impl.headers.HeadersMultiMap;
+
+import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
+import static org.apache.cassandra.sidecar.utils.DigestVerifierFactory.FALLBACK_VERIFIER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link DigestVerifierFactory}
+ */
+class DigestVerifierFactoryTest
+{
+ MultiMap options;
+ Vertx vertx = Vertx.vertx();
+
+ @BeforeEach
+ void setup()
+ {
+ options = new HeadersMultiMap();
+ }
+
+
+ @Test
+ void testEmptyFactoryReturnsFallbackVerifier()
+ {
+ DigestVerifier verifier = new DigestVerifierFactory(vertx).verifier(options);
+ assertThat(verifier).as("should fallback to the fallback verifier when no verifiers are configured")
+ .isNotNull()
+ .isSameAs(FALLBACK_VERIFIER);
+ }
+
+ @Test
+ void testMd5Verifier()
+ {
+ options.set("content-md5", "md5-header");
+ DigestVerifier verifier = new DigestVerifierFactory(vertx).verifier(options);
+
+ assertThat(verifier).as("MD5DigestVerifier can verify MD5 content headers")
+ .isNotNull()
+ .isInstanceOf(MD5DigestVerifier.class);
+ }
+
+ @Test
+ void testXXHashVerifier()
+ {
+ options.set(CONTENT_XXHASH32, "xxhash-header");
+ DigestVerifier verifier = new DigestVerifierFactory(vertx).verifier(options);
+
+ assertThat(verifier).as("XXHashDigestVerifier can verify XXHash content headers")
+ .isNotNull()
+ .isInstanceOf(XXHash32DigestVerifier.class);
+ }
+
+ @Test
+ void testFirstVerifierTakesPrecedence()
+ {
+ options.set("content-md5", "md5-header")
+ .set(CONTENT_XXHASH32, "xxhash-header");
+ DigestVerifier verifier = new DigestVerifierFactory(vertx).verifier(options);
+
+ assertThat(verifier).as("XXHashDigestVerifier is selected when both headers are present")
+ .isNotNull()
+ .isInstanceOf(XXHash32DigestVerifier.class);
+ }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
deleted file mode 100644
index 8eb5680..0000000
--- a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.utils;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Path;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Base64;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import io.vertx.core.Future;
-import io.vertx.core.Vertx;
-import io.vertx.core.file.AsyncFile;
-import io.vertx.core.file.FileSystem;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/**
- * Unit tests for {@link MD5ChecksumVerifier}
- */
-class MD5ChecksumVerifierTest
-{
- static Vertx vertx;
- static ExposeAsyncFileMD5ChecksumVerifier verifier;
-
- @TempDir
- Path tempDir;
-
- @BeforeAll
- static void setup()
- {
- vertx = Vertx.vertx();
- verifier = new ExposeAsyncFileMD5ChecksumVerifier(vertx.fileSystem());
- }
-
- @Test
- void testFileDescriptorsClosedWithValidChecksum() throws IOException, NoSuchAlgorithmException,
- InterruptedException
- {
- byte[] randomBytes = generateRandomBytes();
- Path randomFilePath = writeBytesToRandomFile(randomBytes);
- String expectedChecksum = Base64.getEncoder()
- .encodeToString(MessageDigest.getInstance("MD5")
- .digest(randomBytes));
-
- runTestScenario(randomFilePath, expectedChecksum);
- }
-
- @Test
- void testFileDescriptorsClosedWithInvalidChecksum() throws IOException, InterruptedException
- {
- Path randomFilePath = writeBytesToRandomFile(generateRandomBytes());
- runTestScenario(randomFilePath, "invalid");
- }
-
- private void runTestScenario(Path filePath, String checksum) throws InterruptedException
- {
- CountDownLatch latch = new CountDownLatch(1);
- verifier.verify(checksum, filePath.toAbsolutePath().toString())
- .onComplete(complete -> latch.countDown());
-
- assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
-
- assertThat(verifier.file).isNotNull();
- // we can't close the file if it's already closed, so we expect the exception here
- assertThatThrownBy(() -> verifier.file.end())
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("File handle is closed");
- }
-
- private byte[] generateRandomBytes()
- {
- byte[] bytes = new byte[1024];
- ThreadLocalRandom.current().nextBytes(bytes);
- return bytes;
- }
-
- private Path writeBytesToRandomFile(byte[] bytes) throws IOException
- {
- Path tempPath = tempDir.resolve("random-file.txt");
- try (RandomAccessFile writer = new RandomAccessFile(tempPath.toFile(), "rw"))
- {
- writer.write(bytes);
- }
- return tempPath;
- }
-
- /**
- * Class that extends from {@link MD5ChecksumVerifier} for testing purposes and holds a reference to the
- * {@link AsyncFile} to ensure that the file has been closed.
- */
- static class ExposeAsyncFileMD5ChecksumVerifier extends MD5ChecksumVerifier
- {
- AsyncFile file;
-
- public ExposeAsyncFileMD5ChecksumVerifier(FileSystem fs)
- {
- super(fs);
- }
-
- @Override
- Future<String> calculateMD5(AsyncFile file)
- {
- this.file = file;
- return super.calculateMD5(file);
- }
- }
-}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
new file mode 100644
index 0000000..550ef8e
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/MD5DigestVerifierTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.common.data.MD5Digest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link MD5DigestVerifier}
+ */
+class MD5DigestVerifierTest
+{
+ static Vertx vertx;
+
+ @TempDir
+ Path tempDir;
+
+ @BeforeAll
+ static void setup()
+ {
+ vertx = Vertx.vertx();
+ }
+
+ @Test
+ void testFileDescriptorsClosedWithValidDigest() throws IOException, NoSuchAlgorithmException,
+ InterruptedException
+ {
+ Path randomFilePath = TestFileUtils.prepareTestFile(tempDir, "random-file.txt", 1024);
+ byte[] randomBytes = Files.readAllBytes(randomFilePath);
+ String expectedDigest = Base64.getEncoder()
+ .encodeToString(MessageDigest.getInstance("MD5")
+ .digest(randomBytes));
+
+ runTestScenario(randomFilePath, expectedDigest);
+ }
+
+ @Test
+ void testFileDescriptorsClosedWithInvalidDigest() throws IOException, InterruptedException
+ {
+ Path randomFilePath = TestFileUtils.prepareTestFile(tempDir, "random-file.txt", 1024);
+ runTestScenario(randomFilePath, "invalid");
+ }
+
+ private void runTestScenario(Path filePath, String digest) throws InterruptedException
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ ExposeAsyncFileMD5DigestVerifier verifier = newVerifier(new MD5Digest(digest));
+ verifier.verify(filePath.toAbsolutePath().toString())
+ .onComplete(complete -> latch.countDown());
+
+ assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
+
+ assertThat(verifier.file).isNotNull();
+ // we can't close the file if it's already closed, so we expect the exception here
+ assertThatThrownBy(() -> verifier.file.end())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("File handle is closed");
+ }
+
+ static ExposeAsyncFileMD5DigestVerifier newVerifier(MD5Digest digest)
+ {
+ return new ExposeAsyncFileMD5DigestVerifier(vertx.fileSystem(), digest);
+ }
+
+ /**
+ * Class that extends from {@link MD5DigestVerifier} for testing purposes and holds a reference to the
+ * {@link AsyncFile} to ensure that the file has been closed.
+ */
+ static class ExposeAsyncFileMD5DigestVerifier extends MD5DigestVerifier
+ {
+ AsyncFile file;
+
+ public ExposeAsyncFileMD5DigestVerifier(FileSystem fs, MD5Digest md5Digest)
+ {
+ super(fs, md5Digest);
+ }
+
+ @Override
+ protected Future<String> calculateDigest(AsyncFile file)
+ {
+ this.file = file;
+ return super.calculateDigest(file);
+ }
+ }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java b/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java
new file mode 100644
index 0000000..195ef34
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/TestFileUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * File utilities for tests
+ */
+public class TestFileUtils
+{
+ /**
+ * Writes random data to a file with name {@code filename} under the specified {@code directory} with
+ * the specified size in bytes.
+ *
+ * @param directory the directory where to
+ * @param fileName the name of the desired file to create
+ * @param sizeInBytes the size of the files in bytes
+ * @return the path of the file that was recently created
+ * @throws IOException when file creation or writing to the file fails
+ */
+ public static Path prepareTestFile(Path directory, String fileName, long sizeInBytes) throws IOException
+ {
+ Path filePath = directory.resolve(fileName);
+ Files.deleteIfExists(filePath);
+
+ byte[] buffer = new byte[1024];
+ try (OutputStream outputStream = Files.newOutputStream(filePath))
+ {
+ int written = 0;
+ while (written < sizeInBytes)
+ {
+ ThreadLocalRandom.current().nextBytes(buffer);
+ int toWrite = (int) Math.min(buffer.length, sizeInBytes - written);
+ outputStream.write(buffer, 0, toWrite);
+ written += toWrite;
+ }
+ }
+ return filePath;
+ }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
new file mode 100644
index 0000000..ff7566e
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/XXHash32DigestVerifierTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.XXHash32Digest;
+import org.assertj.core.api.InstanceOfAssertFactories;
+
+import static org.apache.cassandra.sidecar.restore.RestoreJobUtil.checksum;
+import static org.assertj.core.api.Assertions.as;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.from;
+
+/**
+ * Unit tests for {@link XXHash32DigestVerifier}
+ */
+class XXHash32DigestVerifierTest
+{
+ static Vertx vertx;
+
+ @TempDir
+ static Path tempDir;
+ static Path randomFilePath;
+
+ @BeforeAll
+ static void setup() throws IOException
+ {
+ vertx = Vertx.vertx();
+
+ randomFilePath = TestFileUtils.prepareTestFile(tempDir, "random-file.txt", 1024);
+ }
+
+ @Test
+ void failsWhenDigestIsNull()
+ {
+ assertThatNullPointerException().isThrownBy(() -> newVerifier(null)).withMessage("digest is required");
+ }
+
+ @Test
+ void testFileDescriptorsClosedWithValidDigest() throws IOException, InterruptedException
+ {
+ XXHash32Digest digest = new XXHash32Digest(checksum(randomFilePath.toFile()));
+ runTestScenario(randomFilePath, digest, false);
+ }
+
+ @Test
+ void failsWithNonDefaultSeedAndSeedIsNotPassedAsAnOption() throws IOException, InterruptedException
+ {
+ XXHash32Digest digest = new XXHash32Digest(checksum(randomFilePath.toFile(), 0x55555555));
+ runTestScenario(randomFilePath, digest, true);
+ }
+
+ @Test
+ void testWithCustomSeed() throws IOException, InterruptedException
+ {
+ int seed = 0x55555555;
+ XXHash32Digest digest = new XXHash32Digest(checksum(randomFilePath.toFile(), seed), seed);
+ runTestScenario(randomFilePath, digest, false);
+ }
+
+ @Test
+ void testFileDescriptorsClosedWithInvalidDigest() throws InterruptedException
+ {
+ runTestScenario(randomFilePath, new XXHash32Digest("invalid"), true);
+ }
+
+ private void runTestScenario(Path filePath, XXHash32Digest digest,
+ boolean errorExpectedDuringValidation) throws InterruptedException
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ ExposeAsyncFileXXHash32DigestVerifier verifier = newVerifier(digest);
+ verifier.verify(filePath.toAbsolutePath().toString())
+ .onComplete(complete -> {
+ if (errorExpectedDuringValidation)
+ {
+ assertThat(complete.failed()).isTrue();
+ assertThat(complete.cause())
+ .isInstanceOf(HttpException.class)
+ .extracting(from(t -> ((HttpException) t).getPayload()), as(InstanceOfAssertFactories.STRING))
+ .contains("Digest mismatch. expected_digest=" + digest.value());
+ }
+ else
+ {
+ assertThat(complete.failed()).isFalse();
+ assertThat(complete.result()).endsWith("random-file.txt");
+ }
+ latch.countDown();
+ });
+
+ assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
+
+ assertThat(verifier.file).isNotNull();
+ // we can't close the file if it's already closed, so we expect the exception here
+ assertThatThrownBy(() -> verifier.file.end())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("File handle is closed");
+ }
+
+ static ExposeAsyncFileXXHash32DigestVerifier newVerifier(XXHash32Digest digest)
+ {
+ return new ExposeAsyncFileXXHash32DigestVerifier(vertx.fileSystem(), digest);
+ }
+
+ /**
+ * Class that extends from {@link XXHash32DigestVerifier} for testing purposes and holds a reference to the
+ * {@link AsyncFile} to ensure that the file has been closed.
+ */
+ static class ExposeAsyncFileXXHash32DigestVerifier extends XXHash32DigestVerifier
+ {
+ AsyncFile file;
+
+ public ExposeAsyncFileXXHash32DigestVerifier(FileSystem fs, XXHash32Digest digest)
+ {
+ super(fs, digest);
+ }
+
+ @Override
+ protected Future<String> calculateDigest(AsyncFile file)
+ {
+ this.file = file;
+ return super.calculateDigest(file);
+ }
+ }
+}