Fix data corruption handling issues

Treat AssertionError as corruption, assert positive deletion timestamps and
TTLs and treat localDeletionTime < TTL as invalid.

patch by Branimir Lambov; reviewed by Berenguer Blasi for CASSANDRA-18676
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index a0450a0..45a5bf8 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -70,8 +70,7 @@
 
     private DeletionTime(long markedForDeleteAt, long localDeletionTime)
     {
-        this.markedForDeleteAt = markedForDeleteAt;
-        this.localDeletionTimeUnsignedInteger = Cell.deletionTimeLongToUnsignedInteger(localDeletionTime);
+        this(markedForDeleteAt, Cell.deletionTimeLongToUnsignedInteger(localDeletionTime));
     }
 
     private DeletionTime(long markedForDeleteAt, int localDeletionTimeUnsignedInteger)
@@ -116,12 +115,13 @@
     }
 
     /**
-     * check if this deletion time is valid - localDeletionTime can never be negative
+     * Check if this deletion time is valid - markedForDeleteAt can only negative if the deletion is LIVE.
+     * localDeletionTime is not checked as it is stored as an unsigned int and cannot be negative.
      * @return true if it is valid
      */
     public boolean validate()
     {
-        return true;
+        return markedForDeleteAt >= 0 || isLive();
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
index 9850d08..fffcca8 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -21,12 +21,14 @@
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.utils.CassandraUInt;
 import org.apache.cassandra.utils.memory.ByteBufferCloner;
@@ -214,6 +216,29 @@
     protected abstract int localDeletionTimeAsUnsignedInt();
 
     /**
+     * Handle unsigned encoding and potentially invalid localDeletionTime.
+     */
+    public static long decodeLocalDeletionTime(long localDeletionTime, int ttl, DeserializationHelper helper)
+    {
+        if (localDeletionTime >= ttl)
+            return localDeletionTime;   // fast path, positive and valid signed 32-bit integer
+
+        if (localDeletionTime < 0)
+        {
+            // Overflown signed int, decode to long. The result is guaranteed > ttl (and any signed int)
+            return helper.version < MessagingService.VERSION_50
+                   ? INVALID_DELETION_TIME
+                   : deletionTimeUnsignedIntegerToLong((int) localDeletionTime);
+        }
+
+        if (ttl == LivenessInfo.EXPIRED_LIVENESS_TTL)
+            return localDeletionTime;   // ttl is already expired, localDeletionTime is valid
+        else
+            return INVALID_DELETION_TIME;  // Invalid as it can't occur without corruption and would cause negative
+                                           // timestamp on expiry.
+    }
+
+    /**
      * The serialization format for cell is:
      *     [ flags ][ timestamp ][ deletion time ][    ttl    ][ path size ][ path ][ value size ][ value ]
      *     [   1b  ][ 8b (vint) ][   4b (vint)   ][ 4b (vint) ][ 4b (vint) ][  arb ][  4b (vint) ][  arb  ]
@@ -317,11 +342,11 @@
                 }
             }
 
-            if (localDeletionTime < 0)
-                localDeletionTime = helper.version < MessagingService.VERSION_50
-                                    ? INVALID_DELETION_TIME
-                                    : deletionTimeUnsignedIntegerToLong((int) localDeletionTime);
-
+            if (timestamp < 0)
+                throw new IOException("Invalid negative timestamp: " + timestamp);
+            if (ttl < 0)
+                throw new IOException("Invalid TTL: " + ttl);
+            localDeletionTime = decodeLocalDeletionTime(localDeletionTime, ttl, helper);
             return accessor.factory().cell(column, timestamp, ttl, localDeletionTime, value, path);
         }
 
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index c1802cf..cfbcad1 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -589,12 +589,12 @@
             if (hasTimestamp)
             {
                 long timestamp = header.readTimestamp(in);
+                assert timestamp >= 0 : "Invalid negative timestamp " + timestamp;
                 int ttl = hasTTL ? header.readTTL(in) : LivenessInfo.NO_TTL;
+                assert ttl >= 0 : "Invalid TTL " + ttl;
                 long localDeletionTime = hasTTL ? header.readLocalDeletionTime(in) : LivenessInfo.NO_EXPIRATION_TIME;
-                if (localDeletionTime < 0)
-                    localDeletionTime = helper.version < MessagingService.VERSION_50
-                                        ? Cell.INVALID_DELETION_TIME
-                                        : Cell.deletionTimeUnsignedIntegerToLong((int) localDeletionTime);
+
+                localDeletionTime = Cell.decodeLocalDeletionTime(localDeletionTime, ttl, helper);
 
                 rowLiveness = LivenessInfo.withExpirationTime(timestamp, ttl, localDeletionTime);
             }
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index b8355fa..789bc4b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -138,7 +138,7 @@
         {
             return iterator.hasNext();
         }
-        catch (IndexOutOfBoundsException | VIntOutOfRangeException e)
+        catch (IndexOutOfBoundsException | VIntOutOfRangeException | AssertionError e)
         {
             sstable.markSuspect();
             throw new CorruptSSTableException(e, filename);
@@ -163,7 +163,7 @@
         {
             return doCompute();
         }
-        catch (IndexOutOfBoundsException e)
+        catch (IndexOutOfBoundsException | VIntOutOfRangeException | AssertionError e)
         {
             sstable.markSuspect();
             throw new CorruptSSTableException(e, filename);
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index f1efe97..16de3f0 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -798,21 +798,21 @@
         long nowInSec = FBUtilities.nowInSeconds();
 
         // A simple tombstone
-        new RowUpdateBuilder(cfs.metadata(), 0, keys[0]).clustering("cc").delete("a").build().apply();
+        new RowUpdateBuilder(cfs.metadata(), 100, keys[0]).clustering("cc").delete("a").build().apply();
 
         // Collection with an associated complex deletion
-        PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0);
+        PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(100);
         builder.row("cc").add("c", ImmutableSet.of("element1", "element2"));
         builder.buildAsMutation().apply();
 
         // RangeTombstone and a row (not covered by the RT). The row contains a regular tombstone which will not be
         // purged. This is to prevent the partition from being fully purged and removed from the final results
-        new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L, keys[2]).addRangeTombstone("aa", "bb").build().apply();
+        new RowUpdateBuilder(cfs.metadata(), nowInSec, 100L, keys[2]).addRangeTombstone("aa", "bb").build().apply();
         new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L, keys[2]).clustering("cc").delete("a").build().apply();
 
         // Partition with 2 rows, one fully deleted
-        new RowUpdateBuilder(cfs.metadata.get(), 0, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply();
-        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply();
+        new RowUpdateBuilder(cfs.metadata.get(), 100, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply();
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 100, keys[3], "cc").apply();
         Util.flush(cfs);
         cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
 
diff --git a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
index 26adfb1..dc78276 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
@@ -225,7 +225,7 @@
             try
             {
                 cfs.forceMajorCompaction();
-                break;
+                break; // After all corrupted sstables are marked as such, compaction of the rest should succeed.
             }
             catch (Exception e)
             {