Fix missing state resetting on CompressedRandomAccessReader read errors

patch by Runtian Liu; reviewed by Andrés de la Peña and Berenguer Blasi for CASSANDRA-17314
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f9a161..bc45ccd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.28
+ * Fix missing state resetting on CompressedRandomAccessReader read errors (CASSANDRA-17314)
  * Fix restarting of services on gossipping-only member (CASSANDRA-17752)
  * Fix writetime and ttl functions forbidden for collections instead of multicell columns (CASSANDRA-17628)
  * Supress CVE-2020-7238 (CASSANDRA-17697)
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 2dbb013..daaffd3 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -118,14 +118,9 @@
             if (getCrcCheckChance() >= 1d ||
                     getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
             {
-                metadata.checksumType.update(checksum, (compressed));
-
-                if (checksum(chunk) != (int) checksum.getValue())
+                if (checksum(chunk) != (int) metadata.checksumType.of(compressed))
                     throw new CorruptBlockException(getPath(), chunk);
 
-                // reset checksum object back to the original (blank) state
-                checksum.reset();
-
                 compressed.rewind();
             }
 
@@ -152,6 +147,8 @@
         }
         catch (CorruptBlockException e)
         {
+            // Make sure reader does not see stale data.
+            buffer.position(0).limit(0);
             throw new CorruptSSTableException(e, getPath());
         }
         catch (IOException e)
@@ -182,15 +179,11 @@
             if (getCrcCheckChance() >= 1d ||
                 getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
             {
-                metadata.checksumType.update( checksum, compressedChunk);
-
+                int checksum = (int) metadata.checksumType.of(compressedChunk);
                 compressedChunk.limit(compressedChunk.capacity());
-                if (compressedChunk.getInt() != (int) checksum.getValue())
+                if (compressedChunk.getInt() != checksum)
                     throw new CorruptBlockException(getPath(), chunk);
 
-                // reset checksum object back to the original (blank) state
-                checksum.reset();
-
                 compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
             }
 
@@ -217,6 +210,8 @@
         }
         catch (CorruptBlockException e)
         {
+            // Make sure reader does not see stale data.
+            buffer.position(0).limit(0);
             throw new CorruptSSTableException(e, getPath());
         }
 
diff --git a/src/java/org/apache/cassandra/utils/ChecksumType.java b/src/java/org/apache/cassandra/utils/ChecksumType.java
index c9a1eb8..6375fa3 100644
--- a/src/java/org/apache/cassandra/utils/ChecksumType.java
+++ b/src/java/org/apache/cassandra/utils/ChecksumType.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.utils;
 
+import io.netty.util.concurrent.FastThreadLocal;
+
 import java.nio.ByteBuffer;
 import java.util.zip.Checksum;
 import java.util.zip.CRC32;
@@ -60,4 +62,20 @@
     public abstract Checksum newInstance();
 
     public abstract void update(Checksum checksum, ByteBuffer buf);
+
+    private FastThreadLocal<Checksum> instances = new FastThreadLocal<Checksum>()
+    {
+        protected Checksum initialValue() throws Exception
+        {
+            return newInstance();
+        }
+    };
+
+    public long of(ByteBuffer buf)
+    {
+        Checksum checksum = instances.get();
+        checksum.reset();
+        update(checksum, buf);
+        return checksum.getValue();
+    }
 }