CASSANDRA-19626 Fix NullPointerException when reading static column with null values (#58)

Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19626
diff --git a/CHANGES.txt b/CHANGES.txt
index c00c881..4c16a54 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * NullPointerException when reading static column with null values (CASSANDRA-19626)
  * Integrate with the latest sidecar client (CASSANDRA-19616)
  * Support bulk write via S3 (CASSANDRA-19563)
  * Support UDTs in the Bulk Writer (CASSANDRA-19340)
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
index c418d43..4b5ec5e 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -155,8 +155,7 @@
                                                                                              1,
                                                                                              snapshot,
                                                                                              keyspace,
-                                                                                             table,
-                                                                                             "abc1234",
+                                                                                             table + "-abc1234",
                                                                                              dataFileName);
         return new SidecarProvisionedSSTable(mockSidecarClient,
                                              sidecarClientConfig,
diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index f5a5abd..795c758 100644
--- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -171,6 +171,12 @@
     }
 
     @Override
+    public void awaitRingStatus(IInstance instance, IInstance expectedInRing, String status)
+    {
+        ClusterUtils.awaitRingStatus(instance, expectedInRing, status);
+    }
+
+    @Override
     public void awaitGossipStatus(IInstance instance, IInstance expectedInGossip, String targetStatus)
     {
         ClusterUtils.awaitGossipStatus(instance, expectedInGossip, targetStatus);
diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
index c604e37..915d289 100644
--- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
+++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
@@ -82,6 +82,15 @@
     void awaitRingState(IInstance instance, IInstance expectedInRing, String state);
 
     /**
+     * Wait for the ring to have the target instance with the provided status.
+     *
+     * @param instance       instance to check on
+     * @param expectedInRing to look for
+     * @param status         expected
+     */
+    void awaitRingStatus(IInstance instance, IInstance expectedInRing, String status);
+
+    /**
      * Waits for the target instance to have the desired status. Target status is checked via string contains so works
      * with 'NORMAL' but also can check tokens or full state.
      *
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
similarity index 68%
rename from cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
rename to cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
index 88e215d..239af60 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
@@ -38,13 +38,40 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Test that reads different tables with different schemas within the same test
+ * Tests bulk reader functionality
  */
-class ReadDifferentTablesTest extends SharedClusterSparkIntegrationTestBase
+class BulkReaderTest extends SharedClusterSparkIntegrationTestBase
 {
     static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e", "f", "g");
     QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE);
     QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE);
+    QualifiedName tableForNullStaticColumn = uniqueTestTableFullName(TEST_KEYSPACE);
+
+    @Test
+    void testReadNullStaticColumn()
+    {
+        Dataset<Row> data = bulkReaderDataFrame(tableForNullStaticColumn).load();
+
+        List<Row> rows = data.collectAsList().stream()
+                             .sorted(Comparator.comparing(row -> row.getString(0)))
+                             .collect(Collectors.toList());
+        assertThat(rows.size()).isEqualTo(DATASET.size());
+
+        for (int i = 0; i < DATASET.size(); i++)
+        {
+            Row row = rows.get(i);
+            assertThat(row.getString(0)).isEqualTo(DATASET.get(i));
+            assertThat(row.getTimestamp(1).getTime()).isEqualTo(1432815430948560L + i);
+            if (i % 2 == 0)
+            {
+                assertThat(row.getTimestamp(2)).as("Row " + (i + 1) + " is expected to have null timestamp").isNull();
+            }
+            else
+            {
+                assertThat(row.getTimestamp(2).getTime()).isEqualTo(1432815430948560L + i);
+            }
+        }
+    }
 
     @Test
     void testReadingFromTwoDifferentTables()
@@ -78,6 +105,8 @@
         createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
         createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (id int PRIMARY KEY, name text);");
         createTestTable(table2, "CREATE TABLE IF NOT EXISTS %s (name text PRIMARY KEY, value bigint);");
+        createTestTable(tableForNullStaticColumn, "CREATE TABLE %s (id text, timestamp timestamp,\n" +
+                                                  "   timestamp_static timestamp static, PRIMARY KEY (id, timestamp));");
 
         IInstance firstRunningInstance = cluster.getFirstRunningInstance();
         for (int i = 0; i < DATASET.size(); i++)
@@ -85,9 +114,13 @@
             String value = DATASET.get(i);
             String query1 = String.format("INSERT INTO %s (id, name) VALUES (%d, '%s');", table1, i, value);
             String query2 = String.format("INSERT INTO %s (name, value) VALUES ('%s', %d);", table2, value, i);
+            String query3 = String.format("INSERT INTO %s (id, timestamp, timestamp_static) VALUES ('%s',%d, %s)",
+                                          tableForNullStaticColumn, value, 1432815430948560L + i,
+                                          i % 2 == 0 ? "null" : String.valueOf(i + 1432815430948560L));
 
             firstRunningInstance.coordinator().execute(query1, ConsistencyLevel.ALL);
             firstRunningInstance.coordinator().execute(query2, ConsistencyLevel.ALL);
+            firstRunningInstance.coordinator().execute(query3, ConsistencyLevel.ALL);
         }
     }
 }
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
index 2d031cc..bd12bc1 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
@@ -260,13 +260,9 @@
         for (IInstance node : nodesToRemove)
         {
             cluster.stopUnchecked(node);
-            String remAddress = node.config().broadcastAddress().getAddress().getHostAddress();
-
-            List<ClusterUtils.RingInstanceDetails> ring = ClusterUtils.ring(seed);
-            List<ClusterUtils.RingInstanceDetails> match = ring.stream()
-                                                               .filter((d) -> d.getAddress().equals(remAddress))
-                                                               .collect(Collectors.toList());
-            assertThat(match.stream().anyMatch(r -> r.getStatus().equals("Down"))).isTrue();
+            // awaitRingStatus will assert that the node status is down. It retries multiple times until a timeout
+            // is reached and fails if the expected status is not seen.
+            cluster.awaitRingStatus(seed, node, "Down");
         }
     }
 
diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
index 1e1eac6..cbcd814 100644
--- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
+++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
@@ -54,7 +54,8 @@
     @Override
     public Object deserialize(ByteBuffer buffer, boolean isFrozen)
     {
-        return toSparkSqlType(serializer().deserialize(buffer));
+        Object value = serializer().deserialize(buffer);
+        return value != null ? toSparkSqlType(value) : null;
     }
 
     public abstract <T> TypeSerializer<T> serializer();
diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
index 9e0316f..385395e 100644
--- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
+++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
@@ -88,7 +88,8 @@
     @Override
     public Object deserialize(ByteBuffer buffer, boolean isFrozen)
     {
-        return toSparkSqlType(serializer().deserialize(buffer));
+        Object value = serializer().deserialize(buffer);
+        return value != null ? toSparkSqlType(value) : null;
     }
 
     @Override
diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh
index 4e30386..194f5c9 100755
--- a/scripts/build-sidecar.sh
+++ b/scripts/build-sidecar.sh
@@ -24,7 +24,7 @@
   SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
   SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}"
   SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
-  SIDECAR_COMMIT="${SIDECAR_COMMIT:-55866caa2c4601b1d59a8532a97310a9e819931f}"
+  SIDECAR_COMMIT="${SIDECAR_COMMIT:-e95786a077e1137dcaae206854986987edc6a71e}"
   SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies"
   SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR}
   SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build"