Failure to read non-shallow pre-3.0 sstable index entries
patch by Robert Stupp; reviewed by T Jake Luciani for CASSANDRA-11763
diff --git a/CHANGES.txt b/CHANGES.txt
index b549c8e..1a662b8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,7 +14,7 @@
* Always perform collision check before joining ring (CASSANDRA-10134)
* SSTableWriter output discrepancy (CASSANDRA-11646)
* Fix potential timeout in NativeTransportService.testConcurrentDestroys (CASSANDRA-10756)
- * Support large partitions on the 3.0 sstable format (CASSANDRA-11206)
+ * Support large partitions on the 3.0 sstable format (CASSANDRA-11206,11763)
* Add support to rebuild from specific range (CASSANDRA-10406)
* Optimize the overlapping lookup by calculating all the
bounds in advance (CASSANDRA-11571)
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 7fda245..dd1fdb7 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -287,7 +287,7 @@
case CACHE_NOT_INDEXED:
return new RowIndexEntry<>(position);
case CACHE_INDEXED:
- return new IndexedEntry(position, in, idxInfoSerializer, version, true);
+ return new IndexedEntry(position, in, idxInfoSerializer, version);
case CACHE_INDEXED_SHALLOW:
return new ShallowIndexedEntry(position, in, idxInfoSerializer);
default:
@@ -318,7 +318,7 @@
public RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition) throws IOException
{
if (!version.storeRows())
- return LegacyShallowIndexedEntry.deserialize(in, indexFilePosition, idxInfoSerializer, version);
+ return LegacyShallowIndexedEntry.deserialize(in, indexFilePosition, idxInfoSerializer);
long position = in.readUnsignedVInt();
@@ -494,7 +494,7 @@
}
public static RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition,
- IndexInfo.Serializer idxInfoSerializer, Version version) throws IOException
+ IndexInfo.Serializer idxInfoSerializer) throws IOException
{
long dataFilePosition = in.readLong();
@@ -505,7 +505,7 @@
}
else if (size <= DatabaseDescriptor.getColumnIndexCacheSize())
{
- return new IndexedEntry(dataFilePosition, in, idxInfoSerializer, version, false);
+ return new IndexedEntry(dataFilePosition, in, idxInfoSerializer);
}
else
{
@@ -636,7 +636,10 @@
this.idxInfoSerializer = idxInfoSerializer;
}
- private IndexedEntry(long dataFilePosition, DataInputPlus in, IndexInfo.Serializer idxInfoSerializer, Version version, boolean forCache) throws IOException
+ /**
+ * Constructor called from {@link Serializer#deserializeForCache(org.apache.cassandra.io.util.DataInputPlus)}.
+ */
+ private IndexedEntry(long dataFilePosition, DataInputPlus in, IndexInfo.Serializer idxInfoSerializer, Version version) throws IOException
{
super(dataFilePosition);
@@ -650,14 +653,37 @@
for (int i = 0; i < columnsIndexCount; i++)
this.columnsIndex[i] = idxInfoSerializer.deserialize(trackedIn);
- int[] offsets = null;
- if (!forCache && version.storeRows())
+ this.offsets = null;
+
+ this.indexedPartSize = (int) trackedIn.getBytesRead();
+
+ this.idxInfoSerializer = idxInfoSerializer;
+ }
+
+ /**
+ * Constructor called from {@link LegacyShallowIndexedEntry#deserialize(org.apache.cassandra.io.util.DataInputPlus, long, org.apache.cassandra.io.sstable.IndexInfo.Serializer)}.
+ * Only for legacy sstables.
+ */
+ private IndexedEntry(long dataFilePosition, DataInputPlus in, IndexInfo.Serializer idxInfoSerializer) throws IOException
+ {
+ super(dataFilePosition);
+
+ long headerLength = 0;
+ this.deletionTime = DeletionTime.serializer.deserialize(in);
+ int columnsIndexCount = in.readInt();
+
+ TrackedDataInputPlus trackedIn = new TrackedDataInputPlus(in);
+
+ this.columnsIndex = new IndexInfo[columnsIndexCount];
+ for (int i = 0; i < columnsIndexCount; i++)
{
- offsets = new int[this.columnsIndex.length];
- for (int i = 0; i < offsets.length; i++)
- offsets[i] = trackedIn.readInt();
+ this.columnsIndex[i] = idxInfoSerializer.deserialize(trackedIn);
+ if (i == 0)
+ headerLength = this.columnsIndex[i].offset;
}
- this.offsets = offsets;
+ this.headerLength = headerLength;
+
+ this.offsets = null;
this.indexedPartSize = (int) trackedIn.getBytesRead();
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 4b9a769..bbca282 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -36,6 +36,7 @@
import org.slf4j.LoggerFactory;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -126,14 +127,31 @@
@Test
public void testLoadLegacyCqlTables() throws Exception
{
+ DatabaseDescriptor.setColumnIndexCacheSize(99999);
+ CacheService.instance.invalidateKeyCache();
+ doTestLegacyCqlTables();
+ }
+
+ @Test
+ public void testLoadLegacyCqlTablesShallow() throws Exception
+ {
+ DatabaseDescriptor.setColumnIndexCacheSize(0);
+ CacheService.instance.invalidateKeyCache();
+ doTestLegacyCqlTables();
+ }
+
+ private void doTestLegacyCqlTables() throws Exception
+ {
for (String legacyVersion : legacyVersions)
{
logger.info("Loading legacy version: {}", legacyVersion);
+ truncateLegacyTables(legacyVersion);
loadLegacyTables(legacyVersion);
CacheService.instance.invalidateKeyCache();
long startCount = CacheService.instance.keyCache.size();
verifyReads(legacyVersion);
verifyCache(legacyVersion, startCount);
+ compactLegacyTables(legacyVersion);
}
}
@@ -175,6 +193,30 @@
.execute().get();
}
+ private static void truncateLegacyTables(String legacyVersion) throws Exception
+ {
+ for (int compact = 0; compact <= 1; compact++)
+ {
+ logger.info("Truncating legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
+ Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact))).truncateBlocking();
+ Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact))).truncateBlocking();
+ Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact))).truncateBlocking();
+ Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact))).truncateBlocking();
+ }
+ }
+
+ private static void compactLegacyTables(String legacyVersion) throws Exception
+ {
+ for (int compact = 0; compact <= 1; compact++)
+ {
+ logger.info("Compacting legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
+ Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact))).forceMajorCompaction();
+ Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact))).forceMajorCompaction();
+ Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact))).forceMajorCompaction();
+ Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact))).forceMajorCompaction();
+ }
+ }
+
private static void loadLegacyTables(String legacyVersion) throws Exception
{
for (int compact = 0; compact <= 1; compact++)