CASSANDRA-19582: Consume new Sidecar client API to stream SSTables (#54)
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19582
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 2db19d5..8bc9a7f 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -29,6 +29,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -155,7 +156,7 @@
Optional<Integer> sidecarPortFromOptions = MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar port");
this.userProvidedSidecarPort = sidecarPortFromOptions.isPresent() ? sidecarPortFromOptions.get() : getOptionalInt(SIDECAR_PORT).orElse(-1);
this.effectiveSidecarPort = this.userProvidedSidecarPort == -1 ? DEFAULT_SIDECAR_PORT : this.userProvidedSidecarPort;
- this.sidecarInstancesValue = MapUtils.getOrThrow(options, WriterOptions.SIDECAR_INSTANCES.name(), "sidecar_instances");
+ this.sidecarInstancesValue = MapUtils.getOrDefault(options, WriterOptions.SIDECAR_INSTANCES.name(), null);
this.sidecarInstances = sidecarInstances();
this.keyspace = MapUtils.getOrThrow(options, WriterOptions.KEYSPACE.name());
this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name());
@@ -264,7 +265,9 @@
protected Set<? extends SidecarInstance> buildSidecarInstances()
{
- return Arrays.stream(sidecarInstancesValue.split(","))
+ String[] split = Objects.requireNonNull(sidecarInstancesValue, "Unable to build sidecar instances from null value")
+ .split(",");
+ return Arrays.stream(split)
.map(hostname -> new SidecarInstanceImpl(hostname, effectiveSidecarPort))
.collect(Collectors.toSet());
}
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
index 6e4ff0f..db9e2fd 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
@@ -124,7 +124,7 @@
{
return null;
}
- return openStream(snapshotFile.fileName, snapshotFile.size, fileType);
+ return openStream(snapshotFile, fileType);
}
public long length(FileType fileType)
@@ -144,20 +144,20 @@
}
@Nullable
- private InputStream openStream(String component, long size, FileType fileType)
+ private InputStream openStream(ListSnapshotFilesResponse.FileInfo snapshotFile, FileType fileType)
{
- if (component == null)
+ if (snapshotFile == null)
{
return null;
}
if (fileType == FileType.COMPRESSION_INFO)
{
- String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), keyspace, table, snapshotName, component);
+ String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), keyspace, table, snapshotName, snapshotFile.fileName);
byte[] bytes;
try
{
- bytes = COMPRESSION_CACHE.get(key, () -> IOUtils.toByteArray(open(component, fileType, size)));
+ bytes = COMPRESSION_CACHE.get(key, () -> IOUtils.toByteArray(open(snapshotFile, fileType)));
}
catch (ExecutionException exception)
{
@@ -166,24 +166,23 @@
return new ByteArrayInputStream(bytes);
}
- return open(component, fileType, size);
+ return open(snapshotFile, fileType);
}
- public InputStream open(String component, FileType fileType, long size)
+ public InputStream open(ListSnapshotFilesResponse.FileInfo fileInfo, FileType fileType)
{
- SSTableSource<SidecarProvisionedSSTable> ssTableSource = source(component, fileType, size);
+ SSTableSource<SidecarProvisionedSSTable> ssTableSource = source(fileInfo, fileType);
return new SSTableInputStream<>(ssTableSource, stats);
}
/**
* Build an SSTableSource to async provide the bytes
*
- * @param componentName the SSTable component to stream
- * @param fileType SSTable file type
- * @param size file size in bytes
+ * @param fileInfo contains information about the file to stream
+ * @param fileType SSTable file type
* @return an SSTableSource implementation that uses Sidecar client to request bytes
*/
- private SSTableSource<SidecarProvisionedSSTable> source(String componentName, FileType fileType, long size)
+ private SSTableSource<SidecarProvisionedSSTable> source(ListSnapshotFilesResponse.FileInfo fileInfo, FileType fileType)
{
SidecarProvisionedSSTable thisSSTable = this;
return new SSTableSource<SidecarProvisionedSSTable>()
@@ -191,12 +190,7 @@
@Override
public void request(long start, long end, StreamConsumer consumer)
{
- sidecar.streamSSTableComponent(instance,
- keyspace,
- table,
- snapshotName,
- componentName,
- HttpRange.of(start, end),
+ sidecar.streamSSTableComponent(instance, fileInfo, HttpRange.of(start, end),
new SidecarStreamConsumerAdapter(consumer));
}
@@ -235,7 +229,7 @@
@Override
public long size()
{
- return size;
+ return fileInfo.size;
}
};
}
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
index 0ce0e4f..c24c5e8 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -156,6 +156,7 @@
snapshot,
keyspace,
table,
+ "abc1234",
dataFileName);
return new SidecarProvisionedSSTable(mockSidecarClient,
sidecarClientConfig,
diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh
index 09a59ec..a639701 100755
--- a/scripts/build-sidecar.sh
+++ b/scripts/build-sidecar.sh
@@ -24,7 +24,7 @@
SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}"
SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
- SIDECAR_COMMIT="${SIDECAR_COMMIT:-fd6f7ac5f9f19dbbeeb9e7f80ca1fcbf60d5a4c6}"
+ SIDECAR_COMMIT="${SIDECAR_COMMIT:-4a6b8c9cfe0c6286d12c7d561941a24c25a206ef}"
SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies"
SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR}
SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build"