CASSANDRA-19626 Fix NullPointerException when reading static column with null values (#58)
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19626
diff --git a/CHANGES.txt b/CHANGES.txt
index c00c881..4c16a54 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * NullPointerException when reading static column with null values (CASSANDRA-19626)
* Integrate with the latest sidecar client (CASSANDRA-19616)
* Support bulk write via S3 (CASSANDRA-19563)
* Support UDTs in the Bulk Writer (CASSANDRA-19340)
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
index c418d43..4b5ec5e 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -155,8 +155,7 @@
1,
snapshot,
keyspace,
- table,
- "abc1234",
+ table + "-abc1234",
dataFileName);
return new SidecarProvisionedSSTable(mockSidecarClient,
sidecarClientConfig,
diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index f5a5abd..795c758 100644
--- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -171,6 +171,12 @@
}
@Override
+ public void awaitRingStatus(IInstance instance, IInstance expectedInRing, String status)
+ {
+ ClusterUtils.awaitRingStatus(instance, expectedInRing, status);
+ }
+
+ @Override
public void awaitGossipStatus(IInstance instance, IInstance expectedInGossip, String targetStatus)
{
ClusterUtils.awaitGossipStatus(instance, expectedInGossip, targetStatus);
diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
index c604e37..915d289 100644
--- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
+++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
@@ -82,6 +82,15 @@
void awaitRingState(IInstance instance, IInstance expectedInRing, String state);
/**
+ * Wait for the ring to have the target instance with the provided status.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing to look for
+ * @param status expected
+ */
+ void awaitRingStatus(IInstance instance, IInstance expectedInRing, String status);
+
+ /**
* Waits for the target instance to have the desired status. Target status is checked via string contains so works
* with 'NORMAL' but also can check tokens or full state.
*
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
similarity index 68%
rename from cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
rename to cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
index 88e215d..239af60 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
@@ -38,13 +38,40 @@
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Test that reads different tables with different schemas within the same test
+ * Tests bulk reader functionality
*/
-class ReadDifferentTablesTest extends SharedClusterSparkIntegrationTestBase
+class BulkReaderTest extends SharedClusterSparkIntegrationTestBase
{
static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e", "f", "g");
QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE);
QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE);
+ QualifiedName tableForNullStaticColumn = uniqueTestTableFullName(TEST_KEYSPACE);
+
+ @Test
+ void testReadNullStaticColumn()
+ {
+ Dataset<Row> data = bulkReaderDataFrame(tableForNullStaticColumn).load();
+
+ List<Row> rows = data.collectAsList().stream()
+ .sorted(Comparator.comparing(row -> row.getString(0)))
+ .collect(Collectors.toList());
+ assertThat(rows.size()).isEqualTo(DATASET.size());
+
+ for (int i = 0; i < DATASET.size(); i++)
+ {
+ Row row = rows.get(i);
+ assertThat(row.getString(0)).isEqualTo(DATASET.get(i));
+ assertThat(row.getTimestamp(1).getTime()).isEqualTo(1432815430948560L + i);
+ if (i % 2 == 0)
+ {
+ assertThat(row.getTimestamp(2)).as("Row " + (i + 1) + " is expected to have null timestamp").isNull();
+ }
+ else
+ {
+ assertThat(row.getTimestamp(2).getTime()).isEqualTo(1432815430948560L + i);
+ }
+ }
+ }
@Test
void testReadingFromTwoDifferentTables()
@@ -78,6 +105,8 @@
createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (id int PRIMARY KEY, name text);");
createTestTable(table2, "CREATE TABLE IF NOT EXISTS %s (name text PRIMARY KEY, value bigint);");
+ createTestTable(tableForNullStaticColumn, "CREATE TABLE %s (id text, timestamp timestamp,\n" +
+ " timestamp_static timestamp static, PRIMARY KEY (id, timestamp));");
IInstance firstRunningInstance = cluster.getFirstRunningInstance();
for (int i = 0; i < DATASET.size(); i++)
@@ -85,9 +114,13 @@
String value = DATASET.get(i);
String query1 = String.format("INSERT INTO %s (id, name) VALUES (%d, '%s');", table1, i, value);
String query2 = String.format("INSERT INTO %s (name, value) VALUES ('%s', %d);", table2, value, i);
+ String query3 = String.format("INSERT INTO %s (id, timestamp, timestamp_static) VALUES ('%s',%d, %s)",
+ tableForNullStaticColumn, value, 1432815430948560L + i,
+ i % 2 == 0 ? "null" : String.valueOf(i + 1432815430948560L));
firstRunningInstance.coordinator().execute(query1, ConsistencyLevel.ALL);
firstRunningInstance.coordinator().execute(query2, ConsistencyLevel.ALL);
+ firstRunningInstance.coordinator().execute(query3, ConsistencyLevel.ALL);
}
}
}
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
index 2d031cc..bd12bc1 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
@@ -260,13 +260,9 @@
for (IInstance node : nodesToRemove)
{
cluster.stopUnchecked(node);
- String remAddress = node.config().broadcastAddress().getAddress().getHostAddress();
-
- List<ClusterUtils.RingInstanceDetails> ring = ClusterUtils.ring(seed);
- List<ClusterUtils.RingInstanceDetails> match = ring.stream()
- .filter((d) -> d.getAddress().equals(remAddress))
- .collect(Collectors.toList());
- assertThat(match.stream().anyMatch(r -> r.getStatus().equals("Down"))).isTrue();
+ // awaitRingStatus will assert that the node status is down. It retries multiple times until a timeout
+ // is reached and fails if the expected status is not seen.
+ cluster.awaitRingStatus(seed, node, "Down");
}
}
diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
index 1e1eac6..cbcd814 100644
--- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
+++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
@@ -54,7 +54,8 @@
@Override
public Object deserialize(ByteBuffer buffer, boolean isFrozen)
{
- return toSparkSqlType(serializer().deserialize(buffer));
+ Object value = serializer().deserialize(buffer);
+ return value != null ? toSparkSqlType(value) : null;
}
public abstract <T> TypeSerializer<T> serializer();
diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
index 9e0316f..385395e 100644
--- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
+++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
@@ -88,7 +88,8 @@
@Override
public Object deserialize(ByteBuffer buffer, boolean isFrozen)
{
- return toSparkSqlType(serializer().deserialize(buffer));
+ Object value = serializer().deserialize(buffer);
+ return value != null ? toSparkSqlType(value) : null;
}
@Override
diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh
index 4e30386..194f5c9 100755
--- a/scripts/build-sidecar.sh
+++ b/scripts/build-sidecar.sh
@@ -24,7 +24,7 @@
SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}"
SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
- SIDECAR_COMMIT="${SIDECAR_COMMIT:-55866caa2c4601b1d59a8532a97310a9e819931f}"
+ SIDECAR_COMMIT="${SIDECAR_COMMIT:-e95786a077e1137dcaae206854986987edc6a71e}"
SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies"
SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR}
SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build"