Fix data corruption handling issues
Treat AssertionError as corruption, assert positive deletion timestamps and
TTLs and treat localDeletionTime < TTL as invalid.
patch by Branimir Lambov; reviewed by Berenguer Blasi for CASSANDRA-18676
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index a0450a0..45a5bf8 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -70,8 +70,7 @@
private DeletionTime(long markedForDeleteAt, long localDeletionTime)
{
- this.markedForDeleteAt = markedForDeleteAt;
- this.localDeletionTimeUnsignedInteger = Cell.deletionTimeLongToUnsignedInteger(localDeletionTime);
+ this(markedForDeleteAt, Cell.deletionTimeLongToUnsignedInteger(localDeletionTime));
}
private DeletionTime(long markedForDeleteAt, int localDeletionTimeUnsignedInteger)
@@ -116,12 +115,13 @@
}
/**
- * check if this deletion time is valid - localDeletionTime can never be negative
+ * Check if this deletion time is valid - markedForDeleteAt can only negative if the deletion is LIVE.
+ * localDeletionTime is not checked as it is stored as an unsigned int and cannot be negative.
* @return true if it is valid
*/
public boolean validate()
{
- return true;
+ return markedForDeleteAt >= 0 || isLive();
}
@Override
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
index 9850d08..fffcca8 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -21,12 +21,14 @@
import java.nio.ByteBuffer;
import java.util.Comparator;
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.utils.CassandraUInt;
import org.apache.cassandra.utils.memory.ByteBufferCloner;
@@ -214,6 +216,29 @@
protected abstract int localDeletionTimeAsUnsignedInt();
/**
+ * Handle unsigned encoding and potentially invalid localDeletionTime.
+ */
+ public static long decodeLocalDeletionTime(long localDeletionTime, int ttl, DeserializationHelper helper)
+ {
+ if (localDeletionTime >= ttl)
+ return localDeletionTime; // fast path, positive and valid signed 32-bit integer
+
+ if (localDeletionTime < 0)
+ {
+ // Overflown signed int, decode to long. The result is guaranteed > ttl (and any signed int)
+ return helper.version < MessagingService.VERSION_50
+ ? INVALID_DELETION_TIME
+ : deletionTimeUnsignedIntegerToLong((int) localDeletionTime);
+ }
+
+ if (ttl == LivenessInfo.EXPIRED_LIVENESS_TTL)
+ return localDeletionTime; // ttl is already expired, localDeletionTime is valid
+ else
+ return INVALID_DELETION_TIME; // Invalid as it can't occur without corruption and would cause negative
+ // timestamp on expiry.
+ }
+
+ /**
* The serialization format for cell is:
* [ flags ][ timestamp ][ deletion time ][ ttl ][ path size ][ path ][ value size ][ value ]
* [ 1b ][ 8b (vint) ][ 4b (vint) ][ 4b (vint) ][ 4b (vint) ][ arb ][ 4b (vint) ][ arb ]
@@ -317,11 +342,11 @@
}
}
- if (localDeletionTime < 0)
- localDeletionTime = helper.version < MessagingService.VERSION_50
- ? INVALID_DELETION_TIME
- : deletionTimeUnsignedIntegerToLong((int) localDeletionTime);
-
+ if (timestamp < 0)
+ throw new IOException("Invalid negative timestamp: " + timestamp);
+ if (ttl < 0)
+ throw new IOException("Invalid TTL: " + ttl);
+ localDeletionTime = decodeLocalDeletionTime(localDeletionTime, ttl, helper);
return accessor.factory().cell(column, timestamp, ttl, localDeletionTime, value, path);
}
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index c1802cf..cfbcad1 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -589,12 +589,12 @@
if (hasTimestamp)
{
long timestamp = header.readTimestamp(in);
+ assert timestamp >= 0 : "Invalid negative timestamp " + timestamp;
int ttl = hasTTL ? header.readTTL(in) : LivenessInfo.NO_TTL;
+ assert ttl >= 0 : "Invalid TTL " + ttl;
long localDeletionTime = hasTTL ? header.readLocalDeletionTime(in) : LivenessInfo.NO_EXPIRATION_TIME;
- if (localDeletionTime < 0)
- localDeletionTime = helper.version < MessagingService.VERSION_50
- ? Cell.INVALID_DELETION_TIME
- : Cell.deletionTimeUnsignedIntegerToLong((int) localDeletionTime);
+
+ localDeletionTime = Cell.decodeLocalDeletionTime(localDeletionTime, ttl, helper);
rowLiveness = LivenessInfo.withExpirationTime(timestamp, ttl, localDeletionTime);
}
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index b8355fa..789bc4b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -138,7 +138,7 @@
{
return iterator.hasNext();
}
- catch (IndexOutOfBoundsException | VIntOutOfRangeException e)
+ catch (IndexOutOfBoundsException | VIntOutOfRangeException | AssertionError e)
{
sstable.markSuspect();
throw new CorruptSSTableException(e, filename);
@@ -163,7 +163,7 @@
{
return doCompute();
}
- catch (IndexOutOfBoundsException e)
+ catch (IndexOutOfBoundsException | VIntOutOfRangeException | AssertionError e)
{
sstable.markSuspect();
throw new CorruptSSTableException(e, filename);
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index f1efe97..16de3f0 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -798,21 +798,21 @@
long nowInSec = FBUtilities.nowInSeconds();
// A simple tombstone
- new RowUpdateBuilder(cfs.metadata(), 0, keys[0]).clustering("cc").delete("a").build().apply();
+ new RowUpdateBuilder(cfs.metadata(), 100, keys[0]).clustering("cc").delete("a").build().apply();
// Collection with an associated complex deletion
- PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0);
+ PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(100);
builder.row("cc").add("c", ImmutableSet.of("element1", "element2"));
builder.buildAsMutation().apply();
// RangeTombstone and a row (not covered by the RT). The row contains a regular tombstone which will not be
// purged. This is to prevent the partition from being fully purged and removed from the final results
- new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L, keys[2]).addRangeTombstone("aa", "bb").build().apply();
+ new RowUpdateBuilder(cfs.metadata(), nowInSec, 100L, keys[2]).addRangeTombstone("aa", "bb").build().apply();
new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L, keys[2]).clustering("cc").delete("a").build().apply();
// Partition with 2 rows, one fully deleted
- new RowUpdateBuilder(cfs.metadata.get(), 0, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply();
- RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply();
+ new RowUpdateBuilder(cfs.metadata.get(), 100, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply();
+ RowUpdateBuilder.deleteRow(cfs.metadata(), 100, keys[3], "cc").apply();
Util.flush(cfs);
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
diff --git a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
index 26adfb1..dc78276 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
@@ -225,7 +225,7 @@
try
{
cfs.forceMajorCompaction();
- break;
+ break; // After all corrupted sstables are marked as such, compaction of the rest should succeed.
}
catch (Exception e)
{