Failure to read non-shallow pre-3.0 sstable index entries

patch by Robert Stupp; reviewed by T Jake Luciani for CASSANDRA-11763
diff --git a/CHANGES.txt b/CHANGES.txt
index b549c8e..1a662b8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,7 +14,7 @@
  * Always perform collision check before joining ring (CASSANDRA-10134)
  * SSTableWriter output discrepancy (CASSANDRA-11646)
  * Fix potential timeout in NativeTransportService.testConcurrentDestroys (CASSANDRA-10756)
- * Support large partitions on the 3.0 sstable format (CASSANDRA-11206)
+ * Support large partitions on the 3.0 sstable format (CASSANDRA-11206,11763)
  * Add support to rebuild from specific range (CASSANDRA-10406)
  * Optimize the overlapping lookup by calculating all the
    bounds in advance (CASSANDRA-11571)
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 7fda245..dd1fdb7 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -287,7 +287,7 @@
                 case CACHE_NOT_INDEXED:
                     return new RowIndexEntry<>(position);
                 case CACHE_INDEXED:
-                    return new IndexedEntry(position, in, idxInfoSerializer, version, true);
+                    return new IndexedEntry(position, in, idxInfoSerializer, version);
                 case CACHE_INDEXED_SHALLOW:
                     return new ShallowIndexedEntry(position, in, idxInfoSerializer);
                 default:
@@ -318,7 +318,7 @@
         public RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition) throws IOException
         {
             if (!version.storeRows())
-                return LegacyShallowIndexedEntry.deserialize(in, indexFilePosition, idxInfoSerializer, version);
+                return LegacyShallowIndexedEntry.deserialize(in, indexFilePosition, idxInfoSerializer);
 
             long position = in.readUnsignedVInt();
 
@@ -494,7 +494,7 @@
         }
 
         public static RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition,
-                                                IndexInfo.Serializer idxInfoSerializer, Version version) throws IOException
+                                                IndexInfo.Serializer idxInfoSerializer) throws IOException
         {
             long dataFilePosition = in.readLong();
 
@@ -505,7 +505,7 @@
             }
             else if (size <= DatabaseDescriptor.getColumnIndexCacheSize())
             {
-                return new IndexedEntry(dataFilePosition, in, idxInfoSerializer, version, false);
+                return new IndexedEntry(dataFilePosition, in, idxInfoSerializer);
             }
             else
             {
@@ -636,7 +636,10 @@
             this.idxInfoSerializer = idxInfoSerializer;
         }
 
-        private IndexedEntry(long dataFilePosition, DataInputPlus in, IndexInfo.Serializer idxInfoSerializer, Version version, boolean forCache) throws IOException
+        /**
+         * Constructor called from {@link Serializer#deserializeForCache(org.apache.cassandra.io.util.DataInputPlus)}.
+         */
+        private IndexedEntry(long dataFilePosition, DataInputPlus in, IndexInfo.Serializer idxInfoSerializer, Version version) throws IOException
         {
             super(dataFilePosition);
 
@@ -650,14 +653,37 @@
             for (int i = 0; i < columnsIndexCount; i++)
                 this.columnsIndex[i] = idxInfoSerializer.deserialize(trackedIn);
 
-            int[] offsets = null;
-            if (!forCache && version.storeRows())
+            this.offsets = null;
+
+            this.indexedPartSize = (int) trackedIn.getBytesRead();
+
+            this.idxInfoSerializer = idxInfoSerializer;
+        }
+
+        /**
+         * Constructor called from {@link LegacyShallowIndexedEntry#deserialize(org.apache.cassandra.io.util.DataInputPlus, long, org.apache.cassandra.io.sstable.IndexInfo.Serializer)}.
+         * Only for legacy sstables.
+         */
+        private IndexedEntry(long dataFilePosition, DataInputPlus in, IndexInfo.Serializer idxInfoSerializer) throws IOException
+        {
+            super(dataFilePosition);
+
+            long headerLength = 0;
+            this.deletionTime = DeletionTime.serializer.deserialize(in);
+            int columnsIndexCount = in.readInt();
+
+            TrackedDataInputPlus trackedIn = new TrackedDataInputPlus(in);
+
+            this.columnsIndex = new IndexInfo[columnsIndexCount];
+            for (int i = 0; i < columnsIndexCount; i++)
             {
-                offsets = new int[this.columnsIndex.length];
-                for (int i = 0; i < offsets.length; i++)
-                    offsets[i] = trackedIn.readInt();
+                this.columnsIndex[i] = idxInfoSerializer.deserialize(trackedIn);
+                if (i == 0)
+                    headerLength = this.columnsIndex[i].offset;
             }
-            this.offsets = offsets;
+            this.headerLength = headerLength;
+
+            this.offsets = null;
 
             this.indexedPartSize = (int) trackedIn.getBytesRead();
 
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 4b9a769..bbca282 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -36,6 +36,7 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -126,14 +127,31 @@
     @Test
     public void testLoadLegacyCqlTables() throws Exception
     {
+        DatabaseDescriptor.setColumnIndexCacheSize(99999);
+        CacheService.instance.invalidateKeyCache();
+        doTestLegacyCqlTables();
+    }
+
+    @Test
+    public void testLoadLegacyCqlTablesShallow() throws Exception
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        CacheService.instance.invalidateKeyCache();
+        doTestLegacyCqlTables();
+    }
+
+    private void doTestLegacyCqlTables() throws Exception
+    {
         for (String legacyVersion : legacyVersions)
         {
             logger.info("Loading legacy version: {}", legacyVersion);
+            truncateLegacyTables(legacyVersion);
             loadLegacyTables(legacyVersion);
             CacheService.instance.invalidateKeyCache();
             long startCount = CacheService.instance.keyCache.size();
             verifyReads(legacyVersion);
             verifyCache(legacyVersion, startCount);
+            compactLegacyTables(legacyVersion);
         }
     }
 
@@ -175,6 +193,30 @@
                                              .execute().get();
     }
 
+    private static void truncateLegacyTables(String legacyVersion) throws Exception
+    {
+        for (int compact = 0; compact <= 1; compact++)
+        {
+            logger.info("Truncating legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
+            Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact))).truncateBlocking();
+            Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact))).truncateBlocking();
+            Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact))).truncateBlocking();
+            Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact))).truncateBlocking();
+        }
+    }
+
+    private static void compactLegacyTables(String legacyVersion) throws Exception
+    {
+        for (int compact = 0; compact <= 1; compact++)
+        {
+            logger.info("Compacting legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
+            Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact))).forceMajorCompaction();
+            Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact))).forceMajorCompaction();
+            Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact))).forceMajorCompaction();
+            Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact))).forceMajorCompaction();
+        }
+    }
+
     private static void loadLegacyTables(String legacyVersion) throws Exception
     {
         for (int compact = 0; compact <= 1; compact++)