CASSANDRA-19582: Consume new Sidecar client API to stream SSTables (#54)


Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19582
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 2db19d5..8bc9a7f 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -29,6 +29,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -155,7 +156,7 @@
         Optional<Integer> sidecarPortFromOptions = MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar port");
         this.userProvidedSidecarPort = sidecarPortFromOptions.isPresent() ? sidecarPortFromOptions.get() : getOptionalInt(SIDECAR_PORT).orElse(-1);
         this.effectiveSidecarPort = this.userProvidedSidecarPort == -1 ? DEFAULT_SIDECAR_PORT : this.userProvidedSidecarPort;
-        this.sidecarInstancesValue = MapUtils.getOrThrow(options, WriterOptions.SIDECAR_INSTANCES.name(), "sidecar_instances");
+        this.sidecarInstancesValue = MapUtils.getOrDefault(options, WriterOptions.SIDECAR_INSTANCES.name(), null);
         this.sidecarInstances = sidecarInstances();
         this.keyspace = MapUtils.getOrThrow(options, WriterOptions.KEYSPACE.name());
         this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name());
@@ -264,7 +265,9 @@
 
     protected Set<? extends SidecarInstance> buildSidecarInstances()
     {
-        return Arrays.stream(sidecarInstancesValue.split(","))
+        String[] split = Objects.requireNonNull(sidecarInstancesValue, "Unable to build sidecar instances from null value")
+                                .split(",");
+        return Arrays.stream(split)
                      .map(hostname -> new SidecarInstanceImpl(hostname, effectiveSidecarPort))
                      .collect(Collectors.toSet());
     }
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
index 6e4ff0f..db9e2fd 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
@@ -124,7 +124,7 @@
         {
             return null;
         }
-        return openStream(snapshotFile.fileName, snapshotFile.size, fileType);
+        return openStream(snapshotFile, fileType);
     }
 
     public long length(FileType fileType)
@@ -144,20 +144,20 @@
     }
 
     @Nullable
-    private InputStream openStream(String component, long size, FileType fileType)
+    private InputStream openStream(ListSnapshotFilesResponse.FileInfo snapshotFile, FileType fileType)
     {
-        if (component == null)
+        if (snapshotFile == null)
         {
             return null;
         }
 
         if (fileType == FileType.COMPRESSION_INFO)
         {
-            String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), keyspace, table, snapshotName, component);
+            String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), keyspace, table, snapshotName, snapshotFile.fileName);
             byte[] bytes;
             try
             {
-                bytes = COMPRESSION_CACHE.get(key, () -> IOUtils.toByteArray(open(component, fileType, size)));
+                bytes = COMPRESSION_CACHE.get(key, () -> IOUtils.toByteArray(open(snapshotFile, fileType)));
             }
             catch (ExecutionException exception)
             {
@@ -166,24 +166,23 @@
             return new ByteArrayInputStream(bytes);
         }
 
-        return open(component, fileType, size);
+        return open(snapshotFile, fileType);
     }
 
-    public InputStream open(String component, FileType fileType, long size)
+    public InputStream open(ListSnapshotFilesResponse.FileInfo fileInfo, FileType fileType)
     {
-        SSTableSource<SidecarProvisionedSSTable> ssTableSource = source(component, fileType, size);
+        SSTableSource<SidecarProvisionedSSTable> ssTableSource = source(fileInfo, fileType);
         return new SSTableInputStream<>(ssTableSource, stats);
     }
 
     /**
      * Build an SSTableSource to async provide the bytes
      *
-     * @param componentName the SSTable component to stream
-     * @param fileType      SSTable file type
-     * @param size          file size in bytes
+     * @param fileInfo contains information about the file to stream
+     * @param fileType SSTable file type
      * @return an SSTableSource implementation that uses Sidecar client to request bytes
      */
-    private SSTableSource<SidecarProvisionedSSTable> source(String componentName, FileType fileType, long size)
+    private SSTableSource<SidecarProvisionedSSTable> source(ListSnapshotFilesResponse.FileInfo fileInfo, FileType fileType)
     {
         SidecarProvisionedSSTable thisSSTable = this;
         return new SSTableSource<SidecarProvisionedSSTable>()
@@ -191,12 +190,7 @@
             @Override
             public void request(long start, long end, StreamConsumer consumer)
             {
-                sidecar.streamSSTableComponent(instance,
-                                               keyspace,
-                                               table,
-                                               snapshotName,
-                                               componentName,
-                                               HttpRange.of(start, end),
+                sidecar.streamSSTableComponent(instance, fileInfo, HttpRange.of(start, end),
                                                new SidecarStreamConsumerAdapter(consumer));
             }
 
@@ -235,7 +229,7 @@
             @Override
             public long size()
             {
-                return size;
+                return fileInfo.size;
             }
         };
     }
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
index 0ce0e4f..c24c5e8 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -156,6 +156,7 @@
                                                                                              snapshot,
                                                                                              keyspace,
                                                                                              table,
+                                                                                             "abc1234",
                                                                                              dataFileName);
         return new SidecarProvisionedSSTable(mockSidecarClient,
                                              sidecarClientConfig,
diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh
index 09a59ec..a639701 100755
--- a/scripts/build-sidecar.sh
+++ b/scripts/build-sidecar.sh
@@ -24,7 +24,7 @@
   SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
   SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}"
   SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
-  SIDECAR_COMMIT="${SIDECAR_COMMIT:-fd6f7ac5f9f19dbbeeb9e7f80ca1fcbf60d5a4c6}"
+  SIDECAR_COMMIT="${SIDECAR_COMMIT:-4a6b8c9cfe0c6286d12c7d561941a24c25a206ef}"
   SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies"
   SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR}
   SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build"