| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.sidecar.client; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.charset.Charset; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.StandardCopyOption; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.io.TempDir; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.ValueSource; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import io.netty.handler.codec.http.HttpHeaderNames; |
| import io.netty.handler.codec.http.HttpHeaderValues; |
| import okhttp3.mockwebserver.MockResponse; |
| import okhttp3.mockwebserver.MockWebServer; |
| import okhttp3.mockwebserver.RecordedRequest; |
| import okhttp3.mockwebserver.SocketPolicy; |
| import okio.Buffer; |
| import okio.Okio; |
| import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException; |
| import org.apache.cassandra.sidecar.client.request.RequestExecutorTest; |
| import org.apache.cassandra.sidecar.client.retry.RetryAction; |
| import org.apache.cassandra.sidecar.client.retry.RetryPolicy; |
| import org.apache.cassandra.sidecar.common.ApiEndpointsV1; |
| import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets; |
| import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest; |
| import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest; |
| import org.apache.cassandra.sidecar.common.request.Request; |
| import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload; |
| import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobResponsePayload; |
| import org.apache.cassandra.sidecar.common.request.data.MD5Digest; |
| import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; |
| import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; |
| import org.apache.cassandra.sidecar.common.response.HealthResponse; |
| import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse; |
| import org.apache.cassandra.sidecar.common.response.NodeSettings; |
| import org.apache.cassandra.sidecar.common.response.RingResponse; |
| import org.apache.cassandra.sidecar.common.response.SSTableImportResponse; |
| import org.apache.cassandra.sidecar.common.response.SchemaResponse; |
| import org.apache.cassandra.sidecar.common.response.TimeSkewResponse; |
| import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; |
| import org.apache.cassandra.sidecar.common.response.data.RingEntry; |
| import org.apache.cassandra.sidecar.common.utils.HttpRange; |
| import org.apache.cassandra.sidecar.foundation.RestoreJobSecretsGen; |
| |
| import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; |
| import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; |
| import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; |
| import static io.netty.handler.codec.http.HttpResponseStatus.OK; |
| import static io.netty.handler.codec.http.HttpResponseStatus.PARTIAL_CONTENT; |
| import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.JOB_ID_PATH_PARAM; |
| import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.KEYSPACE_PATH_PARAM; |
| import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.TABLE_PATH_PARAM; |
| import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32; |
| import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.assertThatException; |
| import static org.assertj.core.api.Assertions.assertThatExceptionOfType; |
| import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; |
| import static org.assertj.core.api.Assertions.assertThatThrownBy; |
| import static org.assertj.core.api.Assertions.fail; |
| |
| abstract class SidecarClientTest |
| { |
| SidecarClient client; |
| List<MockWebServer> servers; |
| List<SidecarInstanceImpl> instances; |
| |
| @BeforeEach |
| void setup() |
| { |
| servers = new ArrayList<>(); |
| for (int i = 0; i < 4; i++) |
| { |
| servers.add(new MockWebServer()); |
| } |
| |
| instances = servers.stream() |
| .map(RequestExecutorTest::newSidecarInstance) |
| .collect(Collectors.toList()); |
| client = initialize(instances); |
| } |
| |
| protected abstract SidecarClient initialize(List<SidecarInstanceImpl> instances); |
| |
| @AfterEach |
| void tearDown() throws Exception |
| { |
| for (MockWebServer server : servers) |
| { |
| server.shutdown(); |
| } |
| client.close(); |
| } |
| |
| @Test |
| void testSidecarHealthOk() throws Exception |
| { |
| MockResponse response = new MockResponse() |
| .setResponseCode(200) |
| .setHeader("content-type", "application/json") |
| .setBody("{\"status\":\"OK\"}"); |
| enqueue(response); |
| |
| HealthResponse result = client.sidecarHealth().get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.status()).isEqualToIgnoringCase("OK"); |
| assertThat(result.isOk()).isTrue(); |
| |
| validateResponseServed(ApiEndpointsV1.HEALTH_ROUTE); |
| } |
| |
| @Test |
| void testSidecarHealthNotOk() throws Exception |
| { |
| MockResponse response = new MockResponse() |
| .setResponseCode(503) |
| .setHeader("content-type", "application/json") |
| .setBody("{\"status\":\"NOT_OK\"}"); |
| enqueue(response); |
| |
| assertThatThrownBy(() -> client.sidecarHealth().get(30, TimeUnit.SECONDS)) |
| .isInstanceOf(ExecutionException.class) |
| .hasCauseInstanceOf(RetriesExhaustedException.class); |
| |
| validateResponseServed(ApiEndpointsV1.HEALTH_ROUTE); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Test |
| void testCassandraDeprecatedHealthOk() throws Exception |
| { |
| MockResponse response = new MockResponse() |
| .setResponseCode(200) |
| .setHeader("content-type", "application/json") |
| .setBody("{\"status\":\"OK\"}"); |
| enqueue(response); |
| |
| HealthResponse result = client.cassandraHealth().get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.status()).isEqualToIgnoringCase("OK"); |
| assertThat(result.isOk()).isTrue(); |
| |
| validateResponseServed(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Test |
| void testCassandraDeprecatedHealthNotOk() throws Exception |
| { |
| MockResponse response = new MockResponse() |
| .setResponseCode(503) |
| .setHeader("content-type", "application/json") |
| .setBody("{\"status\":\"NOT_OK\"}"); |
| enqueue(response); |
| |
| assertThatThrownBy(() -> client.cassandraHealth().get(30, TimeUnit.SECONDS)) |
| .isInstanceOf(ExecutionException.class) |
| .hasCauseInstanceOf(RetriesExhaustedException.class); |
| |
| validateResponseServed(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE); |
| } |
| |
| @Test |
| void testCassandraNativeHealthOk() throws Exception |
| { |
| MockResponse response = new MockResponse() |
| .setResponseCode(200) |
| .setHeader("content-type", "application/json") |
| .setBody("{\"status\":\"OK\"}"); |
| enqueue(response); |
| |
| HealthResponse result = client.cassandraNativeHealth().get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.status()).isEqualToIgnoringCase("OK"); |
| assertThat(result.isOk()).isTrue(); |
| |
| validateResponseServed(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE); |
| } |
| |
| @Test |
| void testCassandraNativeHealthNotOk() throws Exception |
| { |
| MockResponse response = new MockResponse() |
| .setResponseCode(503) |
| .setHeader("content-type", "application/json") |
| .setBody("{\"status\":\"NOT_OK\"}"); |
| enqueue(response); |
| |
| assertThatThrownBy(() -> client.cassandraNativeHealth().get(30, TimeUnit.SECONDS)) |
| .isInstanceOf(ExecutionException.class) |
| .hasCauseInstanceOf(RetriesExhaustedException.class); |
| |
| validateResponseServed(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE); |
| } |
| |
| @Test |
| void testCassandraJmxHealthOk() throws Exception |
| { |
| MockResponse response = new MockResponse() |
| .setResponseCode(200) |
| .setHeader("content-type", "application/json") |
| .setBody("{\"status\":\"OK\"}"); |
| enqueue(response); |
| |
| HealthResponse result = client.cassandraJmxHealth().get(1, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.status()).isEqualTo("OK"); |
| assertThat(result.isOk()).isTrue(); |
| |
| validateResponseServed(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE); |
| } |
| |
| @Test |
| void testCassandraJmxHealthNotOk() throws Exception |
| { |
| MockResponse response = new MockResponse() |
| .setResponseCode(503) |
| .setHeader("content-type", "application/json") |
| .setBody("{\"status\":\"NOT_OK\"}"); |
| enqueue(response); |
| |
| assertThatThrownBy(() -> client.cassandraJmxHealth().get(1, TimeUnit.SECONDS)) |
| .isInstanceOf(ExecutionException.class) |
| .hasCauseInstanceOf(RetriesExhaustedException.class); |
| |
| validateResponseServed(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE); |
| } |
| |
| @Test |
| void testFullSchema() throws Exception |
| { |
| String fullSchemaAsString = "{\"schema\":\"CREATE KEYSPACE sample_ks.sample_table ...\"}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(fullSchemaAsString); |
| enqueue(response); |
| |
| SchemaResponse result = client.fullSchema().get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.keyspace()).isNull(); |
| assertThat(result.schema()).isEqualTo("CREATE KEYSPACE sample_ks.sample_table ..."); |
| |
| validateResponseServed(ApiEndpointsV1.ALL_KEYSPACES_SCHEMA_ROUTE); |
| } |
| |
| @Test |
| void testSchema() throws Exception |
| { |
| String schemaAsString = "{\"keyspace\":\"cycling\",\"schema\":\"CREATE KEYSPACE sample_ks.sample_table ...\"}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(schemaAsString); |
| enqueue(response); |
| |
| SchemaResponse result = client.schema("cycling").get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.keyspace()).isEqualTo("cycling"); |
| assertThat(result.schema()).isEqualTo("CREATE KEYSPACE sample_ks.sample_table ..."); |
| |
| validateResponseServed(ApiEndpointsV1.KEYSPACE_SCHEMA_ROUTE.replaceAll(KEYSPACE_PATH_PARAM, |
| "cycling")); |
| } |
| |
| @Test |
| void testRing() throws Exception |
| { |
| String schemaAsString = "[{\"datacenter\":\"dc\",\"address\":\"127.0.0.1\",\"port\":80,\"rack\":\"r1\"," + |
| "\"status\":\"up\",\"state\":\"normal\",\"load\":\"1 KiB\",\"owns\":\"1%\"," + |
| "\"token\":\"100\",\"fqdn\":\"local\",\"hostId\":\"000\"}]"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(schemaAsString); |
| enqueue(response); |
| |
| RingResponse result = client.ring("cycling").get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull() |
| .hasSize(1); |
| RingEntry entry = result.iterator().next(); |
| assertThat(entry.datacenter()).isEqualTo("dc"); |
| assertThat(entry.address()).isEqualTo("127.0.0.1"); |
| assertThat(entry.port()).isEqualTo(80); |
| assertThat(entry.rack()).isEqualTo("r1"); |
| assertThat(entry.status()).isEqualTo("up"); |
| assertThat(entry.state()).isEqualTo("normal"); |
| assertThat(entry.load()).isEqualTo("1 KiB"); |
| assertThat(entry.owns()).isEqualTo("1%"); |
| assertThat(entry.token()).isEqualTo("100"); |
| assertThat(entry.fqdn()).isEqualTo("local"); |
| assertThat(entry.hostId()).isEqualTo("000"); |
| |
| validateResponseServed(ApiEndpointsV1.RING_ROUTE_PER_KEYSPACE.replaceAll(KEYSPACE_PATH_PARAM, |
| "cycling")); |
| } |
| |
| @Test |
| public void testNodeSettings() throws Exception |
| { |
| String nodeSettingsAsString = "{\"partitioner\":\"test-partitioner\", \"releaseVersion\": \"4.0.0\"}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(nodeSettingsAsString); |
| enqueue(response); |
| |
| NodeSettings result = client.nodeSettings().get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.partitioner()).isEqualTo("test-partitioner"); |
| assertThat(result.releaseVersion()).isEqualTo("4.0.0"); |
| |
| validateResponseServed(ApiEndpointsV1.NODE_SETTINGS_ROUTE); |
| } |
| |
| @Test |
| public void testNodeSettingsFromSpecifiedInstance() throws Exception |
| { |
| String nodeSettingsAsString = "{\"partitioner\":\"test-partitioner\", \"releaseVersion\": \"4.0.0\"}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(nodeSettingsAsString); |
| MockWebServer mockWebServer = servers.get(1); |
| mockWebServer.enqueue(response); |
| |
| SidecarInstanceImpl instance = new SidecarInstanceImpl(mockWebServer.getHostName(), mockWebServer.getPort()); |
| NodeSettings result = client.nodeSettings(instance).get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.partitioner()).isEqualTo("test-partitioner"); |
| assertThat(result.releaseVersion()).isEqualTo("4.0.0"); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.NODE_SETTINGS_ROUTE); |
| } |
| |
| @Test |
| public void testGossipInfo() throws Exception |
| { |
| String gossipInfoAsString = "{\"/127.0.0.1:7000\":{\"generation\":\"1\",\"schema\":\"4994b214\"," + |
| "\"rack\":\"r2\",\"heartbeat\":\"214\",\"releaseVersion\":\"4.0.7\"," + |
| "\"sstableVersions\":\"big-nb\"}}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(gossipInfoAsString); |
| enqueue(response); |
| |
| GossipInfoResponse result = client.gossipInfo().get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull().hasSize(1); |
| String key = result.entrySet().iterator().next().getKey(); |
| GossipInfoResponse.GossipInfo gossipInfo = result.get(key); |
| assertThat(gossipInfo.generation()).isEqualTo("1"); |
| assertThat(gossipInfo.schema()).isEqualTo("4994b214"); |
| assertThat(gossipInfo.rack()).isEqualTo("r2"); |
| assertThat(gossipInfo.heartbeat()).isEqualTo("214"); |
| assertThat(gossipInfo.releaseVersion()).isEqualTo("4.0.7"); |
| assertThat(gossipInfo.sstableVersions()).isEqualTo(Collections.singletonList("big-nb")); |
| |
| validateResponseServed(ApiEndpointsV1.GOSSIP_INFO_ROUTE); |
| } |
| |
| @Test |
| public void testTimeSkew() throws Exception |
| { |
| String timeSkewAsString = "{\"currentTime\":\"123456789\",\"allowableSkewInMinutes\":\"122\"}}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(timeSkewAsString); |
| enqueue(response); |
| |
| TimeSkewResponse result = client.timeSkew().get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.allowableSkewInMinutes).isEqualTo(122); |
| assertThat(result.currentTime).isEqualTo(123456789); |
| |
| validateResponseServed(ApiEndpointsV1.TIME_SKEW_ROUTE); |
| } |
| |
| @Test |
| public void testTimeSkewFromReplicaSet() throws Exception |
| { |
| String timeSkewAsString = "{\"currentTime\":\"5555555\",\"allowableSkewInMinutes\":\"24\"}}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(timeSkewAsString); |
| enqueue(response); |
| |
| TimeSkewResponse result = client.timeSkew(instances.subList(1, 2)).get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.allowableSkewInMinutes).isEqualTo(24); |
| assertThat(result.currentTime).isEqualTo(5555555); |
| |
| validateResponseServed(ApiEndpointsV1.TIME_SKEW_ROUTE); |
| } |
| |
| @Test |
| public void testTokenRangeReplicasFromReplicaSet() throws Exception |
| { |
| String keyspace = "test"; |
| String nodeAddress = "127.0.0.1"; |
| int port = 7000; |
| String nodeWithPort = nodeAddress + ":" + port; |
| String expectedRangeStart = "-9223372036854775808"; |
| String expectedRangeEnd = "9223372036854775807"; |
| String tokenRangeReplicasAsString = "{\"replicaMetadata\":{\"127.0.0.1:7000\":{" + |
| "\"state\":\"Normal\"," + |
| "\"status\":\"Up\"," + |
| "\"fqdn\":\"localhost\"," + |
| "\"address\":\"127.0.0.1\"," + |
| "\"port\":7000," + |
| "\"datacenter\":\"datacenter1\"}}," + |
| "\"writeReplicas\":[{\"start\":\"-9223372036854775808\"," + |
| "\"end\":\"9223372036854775807\",\"replicasByDatacenter\":" + |
| "{\"datacenter1\":[\"127.0.0.1:7000\"]}}],\"readReplicas\":" + |
| "[{\"start\":\"-9223372036854775808\",\"end\":\"9223372036854775807\"," + |
| "\"replicasByDatacenter\":{\"datacenter1\":[\"127.0.0.1:7000\"]}}]}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(tokenRangeReplicasAsString); |
| enqueue(response); |
| |
| TokenRangeReplicasResponse result = client.tokenRangeReplicas(instances.subList(1, 2), keyspace) |
| .get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.writeReplicas()).hasSize(1); |
| TokenRangeReplicasResponse.ReplicaInfo writeReplica = result.writeReplicas().get(0); |
| assertThat(writeReplica.start()).isEqualTo(expectedRangeStart); |
| assertThat(writeReplica.end()).isEqualTo(expectedRangeEnd); |
| assertThat(writeReplica.replicasByDatacenter()).containsKey("datacenter1"); |
| assertThat(writeReplica.replicasByDatacenter().get("datacenter1")).containsExactly(nodeWithPort); |
| assertThat(result.readReplicas()).hasSize(1); |
| TokenRangeReplicasResponse.ReplicaInfo readReplica = result.readReplicas().get(0); |
| assertThat(readReplica.start()).isEqualTo(expectedRangeStart); |
| assertThat(readReplica.end()).isEqualTo(expectedRangeEnd); |
| assertThat(readReplica.replicasByDatacenter()).containsKey("datacenter1"); |
| assertThat(readReplica.replicasByDatacenter().get("datacenter1")).containsExactly(nodeWithPort); |
| assertThat(result.replicaMetadata()).hasSize(1); |
| TokenRangeReplicasResponse.ReplicaMetadata instanceMetadata = result.replicaMetadata().get(nodeWithPort); |
| assertThat(instanceMetadata.state()).isEqualTo("Normal"); |
| assertThat(instanceMetadata.status()).isEqualTo("Up"); |
| assertThat(instanceMetadata.fqdn()).isEqualTo("localhost"); |
| assertThat(instanceMetadata.datacenter()).isEqualTo("datacenter1"); |
| |
| validateResponseServed(ApiEndpointsV1.KEYSPACE_TOKEN_MAPPING_ROUTE.replaceAll( |
| KEYSPACE_PATH_PARAM, keyspace)); |
| } |
| |
| @Test |
| public void testListSnapshotFiles() throws Exception |
| { |
| String responseAsString = "{\"snapshotFilesInfo\":[{\"size\":15,\"host\":\"localhost1\",\"port\":2020," + |
| "\"dataDirIndex\":1,\"snapshotName\":\"2023.04.11\",\"keySpaceName\":\"cycling\"," + |
| "\"tableName\":\"cyclist_name\",\"fileName\":\"nb-203-big-TOC.txt\"}]}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(responseAsString); |
| SidecarInstanceImpl sidecarInstance = instances.get(2); |
| MockWebServer mockWebServer = servers.get(2); |
| mockWebServer.enqueue(response); |
| |
| ListSnapshotFilesResponse result = client.listSnapshotFiles(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "2023.04.11") |
| .get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.snapshotFilesInfo()).hasSize(1); |
| ListSnapshotFilesResponse.FileInfo fileInfo = result.snapshotFilesInfo().get(0); |
| assertThat(fileInfo.size).isEqualTo(15); |
| assertThat(fileInfo.host).isEqualTo("localhost1"); |
| assertThat(fileInfo.port).isEqualTo(2020); |
| assertThat(fileInfo.dataDirIndex).isEqualTo(1); |
| assertThat(fileInfo.snapshotName).isEqualTo("2023.04.11"); |
| assertThat(fileInfo.keySpaceName).isEqualTo("cycling"); |
| assertThat(fileInfo.tableName).isEqualTo("cyclist_name"); |
| assertThat(fileInfo.fileName).isEqualTo("nb-203-big-TOC.txt"); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SNAPSHOTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.11") |
| + "?includeSecondaryIndexFiles=true"); |
| } |
| |
| /** |
| * CASSANDRASC-94 introduced a new field ({@code tableId}) to the payload when listing snapshots. We |
| * need to make sure the client is able to handle the payload with the additional field (and ignore it). |
| * |
| * @throws Exception when the test fails |
| */ |
| @Test |
| public void testListSnapshotFilesPayloadWithTableId() throws Exception |
| { |
| String responseAsString = "{\"snapshotFilesInfo\":[{\"size\":15,\"host\":\"localhost1\",\"port\":2020," + |
| "\"dataDirIndex\":1,\"snapshotName\":\"2023.04.11\",\"keySpaceName\":\"cycling\"," + |
| "\"tableName\":\"cyclist_name\",\"tableId\":\"1234\",\"fileName\":" + |
| "\"nb-203-big-TOC.txt\"}]}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(responseAsString); |
| SidecarInstanceImpl sidecarInstance = instances.get(2); |
| MockWebServer mockWebServer = servers.get(2); |
| mockWebServer.enqueue(response); |
| |
| ListSnapshotFilesResponse result = client.listSnapshotFiles(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "2023.04.11") |
| .get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.snapshotFilesInfo()).hasSize(1); |
| ListSnapshotFilesResponse.FileInfo fileInfo = result.snapshotFilesInfo().get(0); |
| assertThat(fileInfo.size).isEqualTo(15); |
| assertThat(fileInfo.host).isEqualTo("localhost1"); |
| assertThat(fileInfo.port).isEqualTo(2020); |
| assertThat(fileInfo.dataDirIndex).isEqualTo(1); |
| assertThat(fileInfo.snapshotName).isEqualTo("2023.04.11"); |
| assertThat(fileInfo.keySpaceName).isEqualTo("cycling"); |
| assertThat(fileInfo.tableName).isEqualTo("cyclist_name"); |
| assertThat(fileInfo.fileName).isEqualTo("nb-203-big-TOC.txt"); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SNAPSHOTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.11") |
| + "?includeSecondaryIndexFiles=true"); |
| } |
| |
| @Test |
| public void testListSnapshotFilesWithoutSecondaryIndexFiles() throws Exception |
| { |
| String responseAsString = "{\"snapshotFilesInfo\":[{\"size\":15,\"host\":\"localhost1\",\"port\":2020," + |
| "\"dataDirIndex\":1,\"snapshotName\":\"2023.04.11\",\"keySpaceName\":\"cycling\"," + |
| "\"tableName\":\"cyclist_name\",\"fileName\":\"nb-203-big-TOC.txt\"}]}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(responseAsString); |
| SidecarInstanceImpl sidecarInstance = instances.get(2); |
| MockWebServer mockWebServer = servers.get(2); |
| mockWebServer.enqueue(response); |
| |
| ListSnapshotFilesResponse result = client.listSnapshotFiles(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "2023.04.11", |
| false) |
| .get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.snapshotFilesInfo()).hasSize(1); |
| ListSnapshotFilesResponse.FileInfo fileInfo = result.snapshotFilesInfo().get(0); |
| assertThat(fileInfo.size).isEqualTo(15); |
| assertThat(fileInfo.host).isEqualTo("localhost1"); |
| assertThat(fileInfo.port).isEqualTo(2020); |
| assertThat(fileInfo.dataDirIndex).isEqualTo(1); |
| assertThat(fileInfo.snapshotName).isEqualTo("2023.04.11"); |
| assertThat(fileInfo.keySpaceName).isEqualTo("cycling"); |
| assertThat(fileInfo.tableName).isEqualTo("cyclist_name"); |
| assertThat(fileInfo.fileName).isEqualTo("nb-203-big-TOC.txt"); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SNAPSHOTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.11")); |
| } |
| |
| |
| @Test |
| void testClearSnapshot() throws Exception |
| { |
| MockResponse response = new MockResponse().setResponseCode(OK.code()); |
| SidecarInstanceImpl sidecarInstance = instances.get(2); |
| MockWebServer mockWebServer = servers.get(2); |
| mockWebServer.enqueue(response); |
| |
| client.clearSnapshot(sidecarInstance, "cycling", "cyclist_name", "2023.04.11") |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SNAPSHOTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.11")); |
| assertThat(request.getMethod()).isEqualTo("DELETE"); |
| } |
| |
| @Test |
| void testCreateSnapshot() throws Exception |
| { |
| MockResponse response = new MockResponse().setResponseCode(OK.code()); |
| SidecarInstanceImpl sidecarInstance = instances.get(3); |
| MockWebServer mockWebServer = servers.get(3); |
| mockWebServer.enqueue(response); |
| |
| client.createSnapshot(sidecarInstance, "cycling", "cyclist_name", "2023.04.11") |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SNAPSHOTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.11")); |
| assertThat(request.getMethod()).isEqualTo("PUT"); |
| } |
| |
| @Test |
| void testCreateSnapshotWithTTL() throws Exception |
| { |
| MockResponse response = new MockResponse().setResponseCode(OK.code()); |
| SidecarInstanceImpl sidecarInstance = instances.get(3); |
| MockWebServer mockWebServer = servers.get(3); |
| mockWebServer.enqueue(response); |
| |
| client.createSnapshot(sidecarInstance, "cycling", "cyclist_name", "2023.04.11", "2d") |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| String expected = ApiEndpointsV1.SNAPSHOTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.11") + "?ttl=2d"; |
| assertThat(request.getPath()).isEqualTo(expected); |
| assertThat(request.getMethod()).isEqualTo("PUT"); |
| } |
| |
| @Test |
| void testCleanUploadSession() throws Exception |
| { |
| MockResponse response = new MockResponse().setResponseCode(OK.code()); |
| SidecarInstanceImpl sidecarInstance = instances.get(0); |
| MockWebServer mockWebServer = servers.get(0); |
| mockWebServer.enqueue(response); |
| |
| client.cleanUploadSession(sidecarInstance, "00000") |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SSTABLE_CLEANUP_ROUTE |
| .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "00000")); |
| assertThat(request.getMethod()).isEqualTo("DELETE"); |
| } |
| |
| @Test |
| void testSSTableImport() throws Exception |
| { |
| String responseAsString = "{\"success\":true,\"uploadId\":\"0000-0000\",\"keyspace\":\"cycling\"," + |
| "\"tableName\":\"cyclist_name\"}"; |
| MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(responseAsString); |
| SidecarInstanceImpl sidecarInstance = instances.get(0); |
| MockWebServer mockWebServer = servers.get(0); |
| mockWebServer.enqueue(response); |
| |
| ImportSSTableRequest.ImportOptions options = new ImportSSTableRequest.ImportOptions(); |
| SSTableImportResponse result = client.importSSTableRequest(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "0000-0000", |
| options) |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(result).isNotNull(); |
| assertThat(result.keyspace()).isEqualTo("cycling"); |
| assertThat(result.tableName()).isEqualTo("cyclist_name"); |
| assertThat(result.success()).isTrue(); |
| assertThat(result.uploadId()).isEqualTo("0000-0000"); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SSTABLE_IMPORT_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000")); |
| assertThat(request.getMethod()).isEqualTo("PUT"); |
| } |
| |
| @Test |
| void testSSTableImportWithAcceptedResponse() throws Exception |
| { |
| String responseAsString = "{\"success\":true,\"uploadId\":\"0000-0000\",\"keyspace\":\"cycling\"," + |
| "\"tableName\":\"cyclist_name\"}"; |
| SidecarInstanceImpl sidecarInstance = instances.get(0); |
| MockWebServer mockWebServer = servers.get(0); |
| mockWebServer.enqueue(new MockResponse().setResponseCode(ACCEPTED.code())); |
| mockWebServer.enqueue(new MockResponse().setResponseCode(ACCEPTED.code())); |
| mockWebServer.enqueue(new MockResponse().setResponseCode(ACCEPTED.code())); |
| mockWebServer.enqueue(new MockResponse().setResponseCode(ACCEPTED.code())); |
| mockWebServer.enqueue(new MockResponse().setResponseCode(OK.code()).setBody(responseAsString)); |
| |
| ImportSSTableRequest.ImportOptions options = new ImportSSTableRequest.ImportOptions(); |
| SSTableImportResponse result = client.importSSTableRequest(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "0000-0000", |
| options) |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(result).isNotNull(); |
| assertThat(result.keyspace()).isEqualTo("cycling"); |
| assertThat(result.tableName()).isEqualTo("cyclist_name"); |
| assertThat(result.success()).isTrue(); |
| assertThat(result.uploadId()).isEqualTo("0000-0000"); |
| |
| assertThat(mockWebServer.getRequestCount()).isEqualTo(5); |
| RecordedRequest request = mockWebServer.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SSTABLE_IMPORT_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000")); |
| assertThat(request.getMethod()).isEqualTo("PUT"); |
| } |
| |
| @Test |
| void testUploadSSTableFailsWhenFileDoesNotExist() |
| { |
| |
| assertThatIllegalArgumentException() |
| .isThrownBy(() -> client.uploadSSTableRequest(new SidecarInstanceImpl("host", 8080), |
| "cycling", |
| "cyclist_name", |
| "0000-0000", |
| "nb-1-big-TOC.txt", |
| null, |
| "path") |
| .get(30, TimeUnit.SECONDS)) |
| .withMessage("File 'path' does not exist"); |
| } |
| |
| @Test |
| void testUploadSSTableWithoutDigest(@TempDir Path tempDirectory) throws Exception |
| { |
| Path fileToUpload = prepareFile(tempDirectory); |
| try (MockWebServer server = new MockWebServer()) |
| { |
| server.enqueue(new MockResponse().setResponseCode(OK.code())); |
| |
| SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); |
| client.uploadSSTableRequest(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "0000-0000", |
| "nb-1-big-TOC.txt", |
| null, |
| fileToUpload.toString()) |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(server.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = server.takeRequest(); |
| assertThat(request.getPath()) |
| .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE |
| .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000") |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt")); |
| assertThat(request.getMethod()).isEqualTo("PUT"); |
| assertThat(request.getHeader(HttpHeaderNames.CONTENT_MD5.toString())).isNull(); |
| assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80"); |
| assertThat(request.getBodySize()).isEqualTo(80); |
| } |
| } |
| |
| @Test |
| void testUploadSSTableWithMD5Digest(@TempDir Path tempDirectory) throws Exception |
| { |
| Path fileToUpload = prepareFile(tempDirectory); |
| try (MockWebServer server = new MockWebServer()) |
| { |
| server.enqueue(new MockResponse().setResponseCode(OK.code())); |
| |
| SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); |
| client.uploadSSTableRequest(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "0000-0000", |
| "nb-1-big-TOC.txt", |
| new MD5Digest("15a69dc6501aa5ae17af037fe053f610"), |
| fileToUpload.toString()) |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(server.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = server.takeRequest(); |
| assertThat(request.getPath()) |
| .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE |
| .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000") |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt")); |
| assertThat(request.getMethod()).isEqualTo("PUT"); |
| assertThat(request.getHeader(HttpHeaderNames.CONTENT_MD5.toString())) |
| .isEqualTo("15a69dc6501aa5ae17af037fe053f610"); |
| assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80"); |
| assertThat(request.getBodySize()).isEqualTo(80); |
| } |
| } |
| |
| @Test |
| void testUploadSSTableWithXXHashDigest(@TempDir Path tempDirectory) throws Exception |
| { |
| Path fileToUpload = prepareFile(tempDirectory); |
| try (MockWebServer server = new MockWebServer()) |
| { |
| server.enqueue(new MockResponse().setResponseCode(OK.code())); |
| |
| SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); |
| client.uploadSSTableRequest(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "0000-0000", |
| "nb-1-big-TOC.txt", |
| new XXHash32Digest("15a69dc6501aa5ae17af037fe053f610"), |
| fileToUpload.toString()) |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(server.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = server.takeRequest(); |
| assertThat(request.getPath()) |
| .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE |
| .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000") |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt")); |
| assertThat(request.getMethod()).isEqualTo("PUT"); |
| assertThat(request.getHeader(CONTENT_XXHASH32)) |
| .isEqualTo("15a69dc6501aa5ae17af037fe053f610"); |
| assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isNull(); |
| assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80"); |
| assertThat(request.getBodySize()).isEqualTo(80); |
| } |
| } |
| |
| @Test |
| void testUploadSSTableWithXXHashDigestAndSeed(@TempDir Path tempDirectory) throws Exception |
| { |
| Path fileToUpload = prepareFile(tempDirectory); |
| try (MockWebServer server = new MockWebServer()) |
| { |
| server.enqueue(new MockResponse().setResponseCode(OK.code())); |
| |
| SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); |
| client.uploadSSTableRequest(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "0000-0000", |
| "nb-1-big-TOC.txt", |
| new XXHash32Digest("15a69dc6501aa5ae17af037fe053f610", "123456"), |
| fileToUpload.toString()) |
| .get(30, TimeUnit.SECONDS); |
| |
| assertThat(server.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = server.takeRequest(); |
| assertThat(request.getPath()) |
| .isEqualTo(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE |
| .replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000") |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt")); |
| assertThat(request.getMethod()).isEqualTo("PUT"); |
| assertThat(request.getHeader(CONTENT_XXHASH32)) |
| .isEqualTo("15a69dc6501aa5ae17af037fe053f610"); |
| assertThat(request.getHeader(CONTENT_XXHASH32_SEED)).isEqualTo("123456"); |
| assertThat(request.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString())).isEqualTo("80"); |
| assertThat(request.getBodySize()).isEqualTo(80); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = { true, false }) |
| void testLegacyStreamSSTableComponentWithNoRange(boolean useLegacyApi) throws Exception |
| { |
| try (MockWebServer server = new MockWebServer()) |
| { |
| CountDownLatch latch = new CountDownLatch(1); |
| List<byte[]> receivedBytes = new ArrayList<>(); |
| StreamConsumer mockStreamConsumer = new StreamConsumer() |
| { |
| @Override |
| public void onRead(StreamBuffer buffer) |
| { |
| assertThat(buffer.readableBytes()).isGreaterThan(0); |
| byte[] dst = new byte[buffer.readableBytes()]; |
| buffer.copyBytes(0, dst, 0, buffer.readableBytes()); |
| receivedBytes.add(dst); |
| } |
| |
| @Override |
| public void onComplete() |
| { |
| latch.countDown(); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) |
| { |
| latch.countDown(); |
| fail("Should not encounter an error", throwable); |
| } |
| }; |
| InputStream inputStream = resourceInputStream("sstables/nb-1-big-TOC.txt"); |
| Buffer buffer = Okio.buffer(Okio.source(inputStream)).getBuffer(); |
| Okio.use(buffer, buffer1 -> { |
| try |
| { |
| return buffer1.writeAll(Okio.source(inputStream)); |
| } |
| catch (IOException e) |
| { |
| throw new RuntimeException(e); |
| } |
| }); |
| |
| SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); |
| MockResponse response = |
| new MockResponse().setResponseCode(OK.code()) |
| .setHeader(HttpHeaderNames.CONTENT_TYPE.toString(), |
| HttpHeaderValues.APPLICATION_OCTET_STREAM) |
| .setHeader(HttpHeaderNames.ACCEPT_RANGES.toString(), "bytes") |
| .setBody(buffer); |
| server.enqueue(response); |
| |
| String expectedPath; |
| if (useLegacyApi) |
| { |
| client.streamSSTableComponent(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "2023.04.12", |
| "nb-203-big-Data.db", |
| null, |
| mockStreamConsumer); |
| expectedPath = ApiEndpointsV1.COMPONENTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.12") |
| .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-203-big-Data.db"); |
| } |
| else |
| { |
| ListSnapshotFilesResponse.FileInfo fileInfo = new ListSnapshotFilesResponse.FileInfo(2023, |
| server.getHostName(), |
| server.getPort(), 0, |
| "2023.04.12", |
| "cycling", |
| "cyclist_name-1234", |
| "nb-1-big-TOC.txt"); |
| client.streamSSTableComponent(sidecarInstance, fileInfo, null, mockStreamConsumer); |
| expectedPath = fileInfo.componentDownloadUrl(); |
| } |
| |
| assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); |
| |
| RecordedRequest request = server.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(expectedPath); |
| assertThat(request.getHeader("User-Agent")).isEqualTo("cassandra-sidecar-test/0.0.1"); |
| assertThat(request.getHeader("range")).isNull(); |
| |
| byte[] bytes = receivedBytes.stream() |
| .collect(ByteArrayOutputStream::new, |
| (outputStream, src) -> outputStream.write(src, 0, src.length), |
| (outputStream, src) -> { |
| }) |
| .toByteArray(); |
| assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("Summary.db\n" + |
| "TOC.txt\n" + |
| "Statistics.db\n" + |
| "Filter.db\n" + |
| "Data.db\n" + |
| "CRC.db\n" + |
| "Digest.crc32\n" + |
| "Index.db\n"); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = { true, false }) |
| void testStreamSSTableComponentWithRange(boolean useLegacyApi) throws Exception |
| { |
| try (MockWebServer server = new MockWebServer()) |
| { |
| CountDownLatch latch = new CountDownLatch(1); |
| List<byte[]> receivedBytes = new ArrayList<>(); |
| StreamConsumer mockStreamConsumer = new StreamConsumer() |
| { |
| @Override |
| public void onRead(StreamBuffer buffer) |
| { |
| assertThat(buffer.readableBytes()).isGreaterThan(0); |
| byte[] dst = new byte[buffer.readableBytes()]; |
| buffer.copyBytes(0, dst, 0, buffer.readableBytes()); |
| receivedBytes.add(dst); |
| } |
| |
| @Override |
| public void onComplete() |
| { |
| latch.countDown(); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) |
| { |
| latch.countDown(); |
| fail("Should not encounter an error", throwable); |
| } |
| }; |
| |
| SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); |
| MockResponse response = |
| new MockResponse().setResponseCode(PARTIAL_CONTENT.code()) |
| .setHeader(HttpHeaderNames.CONTENT_TYPE.toString(), |
| HttpHeaderValues.APPLICATION_OCTET_STREAM) |
| .setHeader(HttpHeaderNames.ACCEPT_RANGES.toString(), "bytes") |
| .setHeader(HttpHeaderNames.CONTENT_RANGE.toString(), "bytes 10-20/80") |
| .setBody("TOC.txt\nSt"); |
| server.enqueue(response); |
| |
| String expectedPath; |
| if (useLegacyApi) |
| { |
| client.streamSSTableComponent(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "2023.04.12", |
| "nb-1-big-TOC.txt", |
| HttpRange.of(10, 20), |
| mockStreamConsumer); |
| expectedPath = ApiEndpointsV1.COMPONENTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.12") |
| .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt"); |
| } |
| else |
| { |
| ListSnapshotFilesResponse.FileInfo fileInfo = new ListSnapshotFilesResponse.FileInfo(2023, |
| server.getHostName(), |
| server.getPort(), 0, |
| "2023.04.12", |
| "cycling", |
| "cyclist_name-1234", |
| "nb-1-big-TOC.txt"); |
| |
| client.streamSSTableComponent(sidecarInstance, fileInfo, HttpRange.of(10, 20), mockStreamConsumer); |
| expectedPath = fileInfo.componentDownloadUrl(); |
| } |
| |
| assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); |
| |
| RecordedRequest request = server.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(expectedPath); |
| assertThat(request.getHeader("User-Agent")).isEqualTo("cassandra-sidecar-test/0.0.1"); |
| assertThat(request.getHeader("range")).isEqualTo("bytes=10-20"); |
| |
| byte[] bytes = receivedBytes.stream() |
| .collect(ByteArrayOutputStream::new, |
| (outputStream, src) -> outputStream.write(src, 0, src.length), |
| (outputStream, src) -> { |
| }) |
| .toByteArray(); |
| assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("TOC.txt\nSt"); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = { true, false }) |
| void testStreamSSTableComponentFailsMidStream(boolean useLegacyApi) throws Exception |
| { |
| try (MockWebServer server = new MockWebServer()) |
| { |
| CountDownLatch latch = new CountDownLatch(1); |
| List<byte[]> receivedBytes = new ArrayList<>(); |
| StreamConsumer mockStreamConsumer = new StreamConsumer() |
| { |
| @Override |
| public void onRead(StreamBuffer buffer) |
| { |
| assertThat(buffer.readableBytes()).isGreaterThan(0); |
| byte[] dst = new byte[buffer.readableBytes()]; |
| buffer.copyBytes(0, dst, 0, buffer.readableBytes()); |
| receivedBytes.add(dst); |
| } |
| |
| @Override |
| public void onComplete() |
| { |
| } |
| |
| @Override |
| public void onError(Throwable throwable) |
| { |
| latch.countDown(); |
| assertThat(throwable).isNotNull(); |
| } |
| }; |
| InputStream inputStream = resourceInputStream("sstables/nb-1-big-TOC.txt"); |
| Buffer buffer = Okio.buffer(Okio.source(inputStream)).getBuffer(); |
| Okio.use(buffer, buffer1 -> { |
| try |
| { |
| return buffer1.writeAll(Okio.source(inputStream)); |
| } |
| catch (IOException e) |
| { |
| throw new RuntimeException(e); |
| } |
| }); |
| |
| SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); |
| MockResponse response = |
| new MockResponse().setResponseCode(OK.code()) |
| .setHeader(HttpHeaderNames.CONTENT_TYPE.toString(), |
| HttpHeaderValues.APPLICATION_OCTET_STREAM) |
| .setHeader(HttpHeaderNames.ACCEPT_RANGES.toString(), "bytes") |
| .setBody(buffer) |
| .setSocketPolicy(SocketPolicy.DISCONNECT_DURING_RESPONSE_BODY); |
| server.enqueue(response); |
| |
| String expectedPath; |
| if (useLegacyApi) |
| { |
| client.streamSSTableComponent(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "2023.04.12", |
| "nb-1-big-TOC.txt", |
| null, |
| mockStreamConsumer); |
| expectedPath = ApiEndpointsV1.COMPONENTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.12") |
| .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt"); |
| } |
| else |
| { |
| |
| ListSnapshotFilesResponse.FileInfo fileInfo = new ListSnapshotFilesResponse.FileInfo(2023, |
| server.getHostName(), |
| server.getPort(), 0, |
| "2023.04.12", |
| "cycling", |
| "cyclist_name-1234", |
| "nb-1-big-TOC.txt"); |
| client.streamSSTableComponent(sidecarInstance, fileInfo, null, mockStreamConsumer); |
| expectedPath = fileInfo.componentDownloadUrl(); |
| } |
| |
| assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); |
| |
| RecordedRequest request = server.takeRequest(); |
| assertThat(request.getPath()).isEqualTo(expectedPath); |
| assertThat(request.getHeader("User-Agent")).isEqualTo("cassandra-sidecar-test/0.0.1"); |
| assertThat(request.getHeader("range")).isNull(); |
| assertThat(receivedBytes).hasSizeGreaterThan(0); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = { true, false }) |
| void testStreamSSTableComponentWithRetries(boolean useLegacyApi) throws Exception |
| { |
| try (MockWebServer server = new MockWebServer()) |
| { |
| CountDownLatch latch = new CountDownLatch(1); |
| List<byte[]> receivedBytes = new ArrayList<>(); |
| StreamConsumer mockStreamConsumer = new StreamConsumer() |
| { |
| @Override |
| public void onRead(StreamBuffer buffer) |
| { |
| assertThat(buffer.readableBytes()).isGreaterThan(0); |
| byte[] dst = new byte[buffer.readableBytes()]; |
| buffer.copyBytes(0, dst, 0, buffer.readableBytes()); |
| receivedBytes.add(dst); |
| } |
| |
| @Override |
| public void onComplete() |
| { |
| latch.countDown(); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) |
| { |
| } |
| }; |
| |
| SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); |
| MockResponse response = |
| new MockResponse().setResponseCode(PARTIAL_CONTENT.code()) |
| .setHeader(HttpHeaderNames.CONTENT_TYPE.toString(), |
| HttpHeaderValues.APPLICATION_OCTET_STREAM) |
| .setHeader(HttpHeaderNames.ACCEPT_RANGES.toString(), "bytes") |
| .setHeader(HttpHeaderNames.CONTENT_RANGE.toString(), "bytes 10-20/80") |
| .setBody("TOC.txt\nSt"); |
| server.enqueue(new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()) |
| .setBody("{\"error\":\"some error\"}")); |
| server.enqueue(new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()) |
| .setBody("{\"error\":\"some error\"}")); |
| server.enqueue(response); |
| |
| String expectedPath; |
| if (useLegacyApi) |
| { |
| client.streamSSTableComponent(sidecarInstance, |
| "cycling", |
| "cyclist_name", |
| "2023.04.12", |
| "nb-1-big-TOC.txt", |
| HttpRange.of(10, 20), |
| mockStreamConsumer); |
| expectedPath = ApiEndpointsV1.COMPONENTS_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name") |
| .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.12") |
| .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt"); |
| } |
| else |
| { |
| ListSnapshotFilesResponse.FileInfo fileInfo = new ListSnapshotFilesResponse.FileInfo(2023, |
| server.getHostName(), |
| server.getPort(), 0, |
| "2023.04.12", |
| "cycling", |
| "cyclist_name-1234", |
| "nb-1-big-TOC.txt"); |
| client.streamSSTableComponent(sidecarInstance, fileInfo, HttpRange.of(10, 20), mockStreamConsumer); |
| expectedPath = fileInfo.componentDownloadUrl(); |
| } |
| assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); |
| |
| server.takeRequest(); // first 500 |
| server.takeRequest(); // second 500 |
| RecordedRequest request3 = server.takeRequest(); |
| assertThat(request3.getPath()).isEqualTo(expectedPath); |
| assertThat(request3.getHeader("User-Agent")).isEqualTo("cassandra-sidecar-test/0.0.1"); |
| assertThat(request3.getHeader("range")).isEqualTo("bytes=10-20"); |
| |
| byte[] bytes = receivedBytes.stream() |
| .collect(ByteArrayOutputStream::new, |
| (outputStream, src) -> outputStream.write(src, 0, src.length), |
| (outputStream, src) -> { |
| }) |
| .toByteArray(); |
| assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("TOC.txt\nSt"); |
| } |
| } |
| |
| @Test |
| void testFailsWithOneAttemptPerServer() |
| { |
| for (MockWebServer server : servers) |
| { |
| MockResponse response = new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()) |
| .setBody("{\"error\":\"some error\"}"); |
| server.enqueue(response); |
| } |
| |
| assertThatExceptionOfType(ExecutionException.class) |
| .isThrownBy(() -> client.schema("cycling").get(30, TimeUnit.SECONDS)) |
| .withRootCauseInstanceOf(RetriesExhaustedException.class) |
| .withMessageContaining("Unable to complete request '/api/v1/keyspaces/cycling/schema' after 4 attempts"); |
| } |
| |
| @Test |
| void testProvidingCustomRetryPolicy() throws ExecutionException, InterruptedException, TimeoutException |
| { |
| String nodeSettingsAsString = "{\"partitioner\":\"test-partitioner\", \"releaseVersion\": \"4.0.0\"}"; |
| MockResponse response = new MockResponse().setResponseCode(ACCEPTED.code()).setBody(nodeSettingsAsString); |
| enqueue(response); |
| |
| RequestContext requestContext = |
| client.requestBuilder() |
| .request(new NodeSettingsRequest()) |
| .retryPolicy(new RetryPolicy() |
| { |
| @Override |
| public void onResponse(CompletableFuture<HttpResponse> responseFuture, |
| Request request, |
| HttpResponse response, |
| Throwable throwable, |
| int attempts, |
| boolean canRetryOnADifferentHost, |
| RetryAction retryAction) |
| { |
| if (response != null && response.statusCode() == ACCEPTED.code()) |
| { |
| responseFuture.complete(response); |
| } |
| else |
| { |
| client.defaultRetryPolicy().onResponse(responseFuture, |
| request, |
| response, |
| throwable, |
| attempts, |
| canRetryOnADifferentHost, |
| retryAction); |
| } |
| } |
| }) |
| .build(); |
| NodeSettings result = client.<NodeSettings>executeRequestAsync(requestContext).get(30, TimeUnit.SECONDS); |
| assertThat(result).isNotNull(); |
| assertThat(result.partitioner()).isEqualTo("test-partitioner"); |
| assertThat(result.releaseVersion()).isEqualTo("4.0.0"); |
| |
| validateResponseServed(ApiEndpointsV1.NODE_SETTINGS_ROUTE); |
| } |
| |
| @Test |
| void testAcceptCreateRestoreJobRequest() throws Exception |
| { |
| String jobIdStr = "8e5799a4-d277-11ed-8d85-6916bb9b8056"; |
| enqueue(new MockResponse() |
| .setResponseCode(OK.code()) |
| .setBody("{\"jobId\":\"" + jobIdStr + "\",\"status\":\"CREATED\"}")); |
| |
| UUID jobId = UUID.fromString(jobIdStr); |
| long expireAt = System.currentTimeMillis() + 10000; |
| RestoreJobSecrets secrets = RestoreJobSecretsGen.genRestoreJobSecrets(); |
| CreateRestoreJobRequestPayload requestPayload = CreateRestoreJobRequestPayload.builder(secrets, expireAt) |
| .jobId(jobId) |
| .build(); |
| CreateRestoreJobResponsePayload responsePayload = client.createRestoreJob("cycling", |
| "rank_by_year_and_name", |
| requestPayload) |
| .join(); |
| |
| assertThat(responsePayload).isNotNull(); |
| assertThat(responsePayload.jobId()).isEqualTo(jobId); |
| assertThat(responsePayload.status()).isEqualTo("CREATED"); |
| |
| ObjectMapper mapper = new ObjectMapper(); |
| String expectedReqBodyString = mapper.writeValueAsString(requestPayload); |
| validateResponseServed(ApiEndpointsV1.CREATE_RESTORE_JOB_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "cycling") |
| .replaceAll(TABLE_PATH_PARAM, "rank_by_year_and_name") |
| .replaceAll(JOB_ID_PATH_PARAM, jobIdStr), |
| recordedRequest -> { |
| String reqBodyString = recordedRequest.getBody() |
| .readString(Charset.defaultCharset()); |
| assertThat(reqBodyString).isEqualTo(expectedReqBodyString); |
| }); |
| } |
| |
| @Test |
| void testCreateRestoreJobShouldNotRetryOnDifferentHostWithBadRequest() throws Exception |
| { |
| String jobIdStr = "8e5799a4-d277-11ed-8d85-6916bb9b8056"; |
| enqueue(new MockResponse() |
| .setResponseCode(BAD_REQUEST.code()) |
| .setBody("{\"status\":\"Fail\"," + |
| "\"message\":\"Error while decoding values, check your request body\"}")); |
| |
| UUID jobId = UUID.fromString(jobIdStr); |
| long expireAt = System.currentTimeMillis() + 10000; |
| RestoreJobSecrets secrets = RestoreJobSecretsGen.genRestoreJobSecrets(); |
| CreateRestoreJobRequestPayload requestPayload = CreateRestoreJobRequestPayload.builder(secrets, expireAt) |
| .jobId(jobId) |
| .build(); |
| assertThatException().isThrownBy(() -> client.createRestoreJob("badkeyspace", |
| "bad_table", |
| requestPayload) |
| .join()) |
| .withCauseInstanceOf(RetriesExhaustedException.class) |
| .withMessageContaining("Unable to complete request '/api/v1/keyspaces/" + |
| "badkeyspace/tables/bad_table/restore-jobs' after 1 attempt"); |
| |
| ObjectMapper mapper = new ObjectMapper(); |
| String expectedReqBodyString = mapper.writeValueAsString(requestPayload); |
| validateResponseServed(ApiEndpointsV1.CREATE_RESTORE_JOB_ROUTE |
| .replaceAll(KEYSPACE_PATH_PARAM, "badkeyspace") |
| .replaceAll(TABLE_PATH_PARAM, "bad_table") |
| .replaceAll(JOB_ID_PATH_PARAM, jobIdStr), |
| recordedRequest -> { |
| String reqBodyString = recordedRequest.getBody() |
| .readString(Charset.defaultCharset()); |
| assertThat(reqBodyString).isEqualTo(expectedReqBodyString); |
| }); |
| } |
| |
| private void enqueue(MockResponse response) |
| { |
| for (MockWebServer server : servers) |
| { |
| server.enqueue(response); |
| } |
| } |
| |
| private void validateResponseServed(String expectedEndpointPath) throws InterruptedException |
| { |
| validateResponseServed(expectedEndpointPath, req -> { |
| }); |
| } |
| |
| private void validateResponseServed(String expectedEndpointPath, |
| Consumer<RecordedRequest> serverReceivedRequestVerifier) throws InterruptedException |
| { |
| for (MockWebServer server : servers) |
| { |
| if (server.getRequestCount() > 0) |
| { |
| assertThat(server.getRequestCount()).isEqualTo(1); |
| RecordedRequest request = server.takeRequest(); |
| serverReceivedRequestVerifier.accept(request); |
| assertThat(request.getPath()).isEqualTo(expectedEndpointPath); |
| return; |
| } |
| } |
| fail("The request was not served by any of the provided servers"); |
| } |
| |
| private InputStream resourceInputStream(String name) |
| { |
| InputStream inputStream = getClass().getClassLoader().getResourceAsStream(name); |
| assertThat(inputStream).isNotNull(); |
| return inputStream; |
| } |
| |
| private Path prepareFile(Path tempDirectory) throws IOException |
| { |
| Path fileToUpload = tempDirectory.resolve("nb-1-big-TOC.txt"); |
| try (InputStream inputStream = resourceInputStream("sstables/nb-1-big-TOC.txt")) |
| { |
| Files.copy(inputStream, fileToUpload, StandardCopyOption.REPLACE_EXISTING); |
| } |
| return fileToUpload; |
| } |
| } |