Garbage-collecting compaction operation and schema option.

patch by Branimir Lambov; reviewed by Marcus Eriksson for CASSANDRA-7019
diff --git a/CHANGES.txt b/CHANGES.txt
index 36a21e6..760cc58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
  * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
  * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
  * Support filtering on non-PRIMARY KEY columns in the CREATE
diff --git a/NEWS.txt b/NEWS.txt
index 8580c7c..85f2767 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,28 @@
 
 New features
 ------------
+   - Compaction can now take into account overlapping tables that don't take part
+     in the compaction to look for deleted or overwritten data in the compacted tables.
+     Then such data is found, it can be safely discarded, which in turn should enable
+     the removal of tombstones over that data.
+
+     The behavior can be engaged in two ways:
+       - as a "nodetool garbagecollect -g CELL/ROW" operation, which applies
+         single-table compaction on all sstables to discard deleted data in one step.
+       - as a "provide_overlapping_tombstones:CELL/ROW/NONE" compaction strategy flag,
+         which uses overlapping tables as a source of deletions/overwrites during all
+         compactions.
+     The argument specifies the granularity at which deleted data is to be found:
+       - If ROW is specified, only whole deleted rows (or sets of rows) will be
+         discarded.
+       - If CELL is specified, any columns whose value is overwritten or deleted
+         will also be discarded.
+       - NONE (default) specifies the old behavior, overlapping tables are not used to
+         decide when to discard data.
+     Which option to use depends on your workload, both ROW and CELL increase the
+     disk load on compaction (especially with the size-tiered compaction strategy),
+     with CELL being more resource-intensive. Both should lead to better read
+     performance if deleting rows (resp. overwriting or deleting cells) is common.
    - Prepared statements are now persisted in the table prepared_statements in
      the system keyspace. Upon startup, this table is used to preload all
      previously prepared statements - i.e. in many cases clients do not need to
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 594da98..20dac1e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -71,6 +71,7 @@
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;
 import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
@@ -1580,6 +1581,11 @@
         return CompactionManager.instance.relocateSSTables(this, jobs);
     }
 
+    public CompactionManager.AllSSTableOpStatus garbageCollect(TombstoneOption tombstoneOption, int jobs) throws ExecutionException, InterruptedException
+    {
+        return CompactionManager.instance.performGarbageCollection(this, tombstoneOption, jobs);
+    }
+
     public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
     {
         assert !sstables.isEmpty();
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 4728ec3..83592f0 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -44,6 +44,7 @@
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -492,6 +493,7 @@
         uncheckedOptions.remove(LOG_ALL_OPTION);
         uncheckedOptions.remove(COMPACTION_ENABLED);
         uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES);
+        uncheckedOptions.remove(CompactionParams.Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString());
         return uncheckedOptions;
     }
 
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e42e7a1..08ad0c0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -20,18 +20,23 @@
 import java.util.*;
 
 import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
-
 import org.apache.cassandra.utils.OverlapIterator;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -53,6 +58,10 @@
     private Refs<SSTableReader> overlappingSSTables;
     private OverlapIterator<PartitionPosition, SSTableReader> overlapIterator;
     private final Iterable<SSTableReader> compacting;
+    private final RateLimiter limiter;
+    private final long minTimestamp;
+    final TombstoneOption tombstoneOption;
+    final Map<SSTableReader, FileDataInput> openDataFiles = new HashMap<>();
 
     public final int gcBefore;
 
@@ -63,11 +72,23 @@
 
     public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore)
     {
+        this(cfs, compacting, gcBefore,
+             CompactionManager.instance.getRateLimiter(),
+             cfs.getCompactionStrategyManager().getCompactionParams().tombstoneOption());
+    }
+
+    public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption)
+    {
         assert cfs != null;
         this.cfs = cfs;
         this.gcBefore = gcBefore;
         this.compacting = compacting;
+        this.limiter = limiter;
         compactingRepaired = compacting != null && compacting.stream().allMatch(SSTableReader::isRepaired);
+        this.tombstoneOption = tombstoneOption;
+        this.minTimestamp = compacting != null && !compacting.isEmpty()       // check needed for test
+                          ? compacting.stream().mapToLong(SSTableReader::getMinTimestamp).min().getAsLong()
+                          : 0;
         refreshOverlaps();
         if (NEVER_PURGE_TOMBSTONES)
             logger.warn("You are running with -Dcassandra.never_purge_tombstones=true, this is dangerous!");
@@ -97,7 +118,7 @@
             return;
 
         if (this.overlappingSSTables != null)
-            overlappingSSTables.release();
+            close();
 
         if (compacting == null)
             overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList());
@@ -228,6 +249,9 @@
     {
         if (overlappingSSTables != null)
             overlappingSSTables.release();
+
+        FileUtils.closeQuietly(openDataFiles.values());
+        openDataFiles.clear();
     }
 
     public boolean compactingRepaired()
@@ -235,4 +259,38 @@
         return !cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones() || compactingRepaired;
     }
 
+    boolean provideTombstoneSources()
+    {
+        return tombstoneOption != TombstoneOption.NONE;
+    }
+
+    // caller must close iterators
+    public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly)
+    {
+        if (!provideTombstoneSources() || !compactingRepaired() || NEVER_PURGE_TOMBSTONES)
+            return null;
+        overlapIterator.update(key);
+        return Iterables.filter(Iterables.transform(overlapIterator.overlaps(),
+                                                    reader -> getShadowIterator(reader, key, tombstoneOnly)),
+                                Predicates.notNull());
+    }
+
+    @SuppressWarnings("resource") // caller to close
+    private UnfilteredRowIterator getShadowIterator(SSTableReader reader, DecoratedKey key, boolean tombstoneOnly)
+    {
+        if (reader.isMarkedSuspect() ||
+            reader.getMaxTimestamp() <= minTimestamp ||
+            tombstoneOnly && !reader.hasTombstones())
+            return null;
+        RowIndexEntry<?> position = reader.getPosition(key, SSTableReader.Operator.EQ);
+        if (position == null)
+            return null;
+        FileDataInput dfile = openDataFiles.computeIfAbsent(reader, this::openDataFile);
+        return reader.simpleIterator(dfile, key, position, tombstoneOnly);
+    }
+
+    private FileDataInput openDataFile(SSTableReader reader)
+    {
+        return limiter != null ? reader.openDataReader(limiter) : reader.openDataReader();
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 0111aec..c4edfa6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -17,14 +17,13 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.partitions.PurgeFunction;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
@@ -33,6 +32,7 @@
 import org.apache.cassandra.index.transactions.CompactionTransaction;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.metrics.CompactionMetrics;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 
 /**
  * Merge multiple iterators over the content of sstable into a "compacted" iterator.
@@ -52,7 +52,6 @@
  */
 public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator
 {
-    private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
     private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
 
     private final OperationType type;
@@ -104,6 +103,7 @@
                                              ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
                                              : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
         boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug
+        merged = Transformation.apply(merged, new GarbageSkipper(controller, nowInSec));
         this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
     }
 
@@ -313,4 +313,237 @@
             return maxPurgeableTimestamp;
         }
     }
+
+    /**
+     * Unfiltered row iterator that removes deleted data as provided by a "tombstone source" for the partition.
+     * The result produced by this iterator is such that when merged with tombSource it produces the same output
+     * as the merge of dataSource and tombSource.
+     */
+    private static class GarbageSkippingUnfilteredRowIterator extends WrappingUnfilteredRowIterator
+    {
+        final UnfilteredRowIterator tombSource;
+        final DeletionTime partitionLevelDeletion;
+        final Row staticRow;
+        final ColumnFilter cf;
+        final int nowInSec;
+        final CFMetaData metadata;
+        final boolean cellLevelGC;
+
+        DeletionTime tombOpenDeletionTime = DeletionTime.LIVE;
+        DeletionTime dataOpenDeletionTime = DeletionTime.LIVE;
+        DeletionTime openDeletionTime = DeletionTime.LIVE;
+        DeletionTime partitionDeletionTime;
+        DeletionTime activeDeletionTime;
+        Unfiltered tombNext = null;
+        Unfiltered dataNext = null;
+        Unfiltered next = null;
+
+        /**
+         * Construct an iterator that filters out data shadowed by the provided "tombstone source".
+         *
+         * @param dataSource The input row. The result is a filtered version of this.
+         * @param tombSource Tombstone source, i.e. iterator used to identify deleted data in the input row.
+         * @param nowInSec Current time, used in choosing the winner when cell expiration is involved.
+         * @param cellLevelGC If false, the iterator will only look at row-level deletion times and tombstones.
+         *                    If true, deleted or overwritten cells within a surviving row will also be removed.
+         */
+        protected GarbageSkippingUnfilteredRowIterator(UnfilteredRowIterator dataSource, UnfilteredRowIterator tombSource, int nowInSec, boolean cellLevelGC)
+        {
+            super(dataSource);
+            this.tombSource = tombSource;
+            this.nowInSec = nowInSec;
+            this.cellLevelGC = cellLevelGC;
+            metadata = dataSource.metadata();
+            cf = ColumnFilter.all(metadata);
+
+            activeDeletionTime = partitionDeletionTime = tombSource.partitionLevelDeletion();
+
+            // Only preserve partition level deletion if not shadowed. (Note: Shadowing deletion must not be copied.)
+            this.partitionLevelDeletion = dataSource.partitionLevelDeletion().supersedes(tombSource.partitionLevelDeletion()) ?
+                    dataSource.partitionLevelDeletion() :
+                    DeletionTime.LIVE;
+
+            Row dataStaticRow = garbageFilterRow(dataSource.staticRow(), tombSource.staticRow());
+            this.staticRow = dataStaticRow != null ? dataStaticRow : Rows.EMPTY_STATIC_ROW;
+
+            tombNext = advance(tombSource);
+            dataNext = advance(dataSource);
+        }
+
+        private static Unfiltered advance(UnfilteredRowIterator source)
+        {
+            return source.hasNext() ? source.next() : null;
+        }
+
+        @Override
+        public DeletionTime partitionLevelDeletion()
+        {
+            return partitionLevelDeletion;
+        }
+
+        public void close()
+        {
+            super.close();
+            tombSource.close();
+        }
+
+        @Override
+        public Row staticRow()
+        {
+            return staticRow;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            // Produce the next element. This may consume multiple elements from both inputs until we find something
+            // from dataSource that is still live. We track the currently open deletion in both sources, as well as the
+            // one we have last issued to the output. The tombOpenDeletionTime is used to filter out content; the others
+            // to decide whether or not a tombstone is superseded, and to be able to surface (the rest of) a deletion
+            // range from the input when a suppressing deletion ends.
+            while (next == null && dataNext != null)
+            {
+                int cmp = tombNext == null ? -1 : metadata.comparator.compare(dataNext, tombNext);
+                if (cmp < 0)
+                {
+                    if (dataNext.isRow())
+                        next = ((Row) dataNext).filter(cf, activeDeletionTime, false, metadata);
+                    else
+                        next = processDataMarker();
+                }
+                else if (cmp == 0)
+                {
+                    if (dataNext.isRow())
+                    {
+                        next = garbageFilterRow((Row) dataNext, (Row) tombNext);
+                    }
+                    else
+                    {
+                        tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
+                        activeDeletionTime = Ordering.natural().max(partitionDeletionTime,
+                                                                    tombOpenDeletionTime);
+                        next = processDataMarker();
+                    }
+                }
+                else // (cmp > 0)
+                {
+                    if (tombNext.isRangeTombstoneMarker())
+                    {
+                        tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
+                        activeDeletionTime = Ordering.natural().max(partitionDeletionTime,
+                                                                    tombOpenDeletionTime);
+                        boolean supersededBefore = openDeletionTime.isLive();
+                        boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime);
+                        // If a range open was not issued because it was superseded and the deletion isn't superseded any more, we need to open it now.
+                        if (supersededBefore && !supersededAfter)
+                            next = new RangeTombstoneBoundMarker(((RangeTombstoneMarker) tombNext).closeBound(false).invert(), dataOpenDeletionTime);
+                        // If the deletion begins to be superseded, we don't close the range yet. This can save us a close/open pair if it ends after the superseding range.
+                    }
+                }
+
+                if (next instanceof RangeTombstoneMarker)
+                    openDeletionTime = updateOpenDeletionTime(openDeletionTime, next);
+
+                if (cmp <= 0)
+                    dataNext = advance(wrapped);
+                if (cmp >= 0)
+                    tombNext = advance(tombSource);
+            }
+            return next != null;
+        }
+
+        protected Row garbageFilterRow(Row dataRow, Row tombRow)
+        {
+            if (cellLevelGC)
+            {
+                return Rows.removeShadowedCells(dataRow, tombRow, activeDeletionTime, nowInSec);
+            }
+            else
+            {
+                DeletionTime deletion = Ordering.natural().max(tombRow.deletion().time(),
+                                                               activeDeletionTime);
+                return dataRow.filter(cf, deletion, false, metadata);
+            }
+        }
+
+        /**
+         * Decide how to act on a tombstone marker from the input iterator. We can decide what to issue depending on
+         * whether or not the ranges before and after the marker are superseded/live -- if none are, we can reuse the
+         * marker; if both are, the marker can be ignored; otherwise we issue a corresponding start/end marker.
+         */
+        private RangeTombstoneMarker processDataMarker()
+        {
+            dataOpenDeletionTime = updateOpenDeletionTime(dataOpenDeletionTime, dataNext);
+            boolean supersededBefore = openDeletionTime.isLive();
+            boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime);
+            RangeTombstoneMarker marker = (RangeTombstoneMarker) dataNext;
+            if (!supersededBefore)
+                if (!supersededAfter)
+                    return marker;
+                else
+                    return new RangeTombstoneBoundMarker(marker.closeBound(false), marker.closeDeletionTime(false));
+            else
+                if (!supersededAfter)
+                    return new RangeTombstoneBoundMarker(marker.openBound(false), marker.openDeletionTime(false));
+                else
+                    return null;
+        }
+
+        @Override
+        public Unfiltered next()
+        {
+            if (!hasNext())
+                throw new IllegalStateException();
+
+            Unfiltered v = next;
+            next = null;
+            return v;
+        }
+
+        private DeletionTime updateOpenDeletionTime(DeletionTime openDeletionTime, Unfiltered next)
+        {
+            RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
+            assert openDeletionTime.isLive() == !marker.isClose(false);
+            assert openDeletionTime.isLive() || openDeletionTime.equals(marker.closeDeletionTime(false));
+            return marker.isOpen(false) ? marker.openDeletionTime(false) : DeletionTime.LIVE;
+        }
+    }
+
+    /**
+     * Partition transformation applying GarbageSkippingUnfilteredRowIterator, obtaining tombstone sources for each
+     * partition using the controller's shadowSources method.
+     */
+    private static class GarbageSkipper extends Transformation<UnfilteredRowIterator>
+    {
+        final int nowInSec;
+        final CompactionController controller;
+        final boolean cellLevelGC;
+
+        private GarbageSkipper(CompactionController controller, int nowInSec)
+        {
+            this.controller = controller;
+            this.nowInSec = nowInSec;
+            cellLevelGC = controller.tombstoneOption == TombstoneOption.CELL;
+        }
+
+        @Override
+        protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+        {
+            Iterable<UnfilteredRowIterator> sources = controller.shadowSources(partition.partitionKey(), !cellLevelGC);
+            if (sources == null)
+                return partition;
+            List<UnfilteredRowIterator> iters = new ArrayList<>();
+            for (UnfilteredRowIterator iter : sources)
+            {
+                if (!iter.isEmpty())
+                    iters.add(iter);
+                else
+                    iter.close();
+            }
+            if (iters.isEmpty())
+                return partition;
+
+            return new GarbageSkippingUnfilteredRowIterator(partition, UnfilteredRowIterators.merge(iters, nowInSec), nowInSec, cellLevelGC);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 519ff05..1cfc76b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -63,6 +63,7 @@
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
@@ -442,7 +443,7 @@
             public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
             {
                 List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals());
-                Collections.sort(sortedSSTables, new SSTableReader.SizeComparator());
+                Collections.sort(sortedSSTables, SSTableReader.sizeComparator);
                 return sortedSSTables;
             }
 
@@ -455,6 +456,42 @@
         }, jobs, OperationType.CLEANUP);
     }
 
+    public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException
+    {
+        assert !cfStore.isIndex();
+
+        return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
+        {
+            @Override
+            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
+            {
+                Iterable<SSTableReader> originals = transaction.originals();
+                if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
+                    originals = Iterables.filter(originals, SSTableReader::isRepaired);
+                List<SSTableReader> sortedSSTables = Lists.newArrayList(originals);
+                Collections.sort(sortedSSTables, SSTableReader.maxTimestampComparator);
+                return sortedSSTables;
+            }
+
+            @Override
+            public void execute(LifecycleTransaction txn) throws IOException
+            {
+                logger.debug("Garbage collecting {}", txn.originals());
+                CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()))
+                {
+                    @Override
+                    protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
+                    {
+                        return new CompactionController(cfStore, toCompact, gcBefore, getRateLimiter(), tombstoneOption);
+                    }
+                };
+                task.setUserDefined(true);
+                task.setCompactionType(OperationType.GARBAGE_COLLECT);
+                task.execute(metrics);
+            }
+        }, jobs, OperationType.GARBAGE_COLLECT);
+    }
+
     public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException
     {
         if (!cfs.getPartitioner().splitter().isPresent())
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index cfe0121..5442a2d 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -138,7 +138,7 @@
         if (sstablesWithTombstones.isEmpty())
             return Collections.emptyList();
 
-        return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+        return Collections.singletonList(Collections.min(sstablesWithTombstones, SSTableReader.sizeComparator));
     }
 
     private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables, long now, int base)
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index ec5e1d9..25c5d20 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -145,7 +145,17 @@
     @Override
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
     {
-        throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions");
+        if (sstables.size() != 1)
+            throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions");
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (transaction == null)
+        {
+            logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
+            return null;
+        }
+        int level = sstables.iterator().next().getSSTableLevel();
+        return getCompactionTask(transaction, gcBefore, level == 0 ? Integer.MAX_VALUE : getMaxSSTableBytes());
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 84a34c9..27b8530 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -38,7 +38,8 @@
     WRITE("Write"),
     VIEW_BUILD("View build"),
     INDEX_SUMMARY("Index summary redistribution"),
-    RELOCATE("Relocate sstables to correct disk");
+    RELOCATE("Relocate sstables to correct disk"),
+    GARBAGE_COLLECT("Remove deleted data");
 
     public final String type;
     public final String fileName;
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index a9cb211..2cfc75d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -210,7 +210,8 @@
                     if (indexFile != null && dataStart != dataStartFromIndex)
                         outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataStartFromIndex));
 
-                    try (UnfilteredRowIterator iterator = withValidation(new RowMergingSSTableIterator(sstable, dataFile, key), dataFile.getPath()))
+                    try (UnfilteredRowIterator iterator = withValidation(new RowMergingSSTableIterator(SSTableIdentityIterator.create(sstable, dataFile, key)),
+                                                                         dataFile.getPath()))
                     {
                         if (prevKey != null && prevKey.compareTo(key) > 0)
                         {
@@ -241,7 +242,7 @@
                         {
                             dataFile.seek(dataStartFromIndex);
 
-                            try (UnfilteredRowIterator iterator = withValidation(new SSTableIdentityIterator(sstable, dataFile, key), dataFile.getPath()))
+                            try (UnfilteredRowIterator iterator = withValidation(SSTableIdentityIterator.create(sstable, dataFile, key), dataFile.getPath()))
                             {
                                 if (prevKey != null && prevKey.compareTo(key) > 0)
                                 {
@@ -471,38 +472,43 @@
      *
      * For more details, refer to CASSANDRA-12144.
      */
-    private static class RowMergingSSTableIterator extends SSTableIdentityIterator
+    private static class RowMergingSSTableIterator extends WrappingUnfilteredRowIterator
     {
-        RowMergingSSTableIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
+        Unfiltered nextToOffer = null;
+
+        RowMergingSSTableIterator(UnfilteredRowIterator source)
         {
-            super(sstable, file, key);
+            super(source);
         }
 
         @Override
-        protected Unfiltered doCompute()
+        public boolean hasNext()
         {
-            if (!iterator.hasNext())
-                return endOfData();
+            return nextToOffer != null || wrapped.hasNext();
+        }
 
-            Unfiltered next = iterator.next();
-            if (!next.isRow())
-                return next;
+        @Override
+        public Unfiltered next()
+        {
+            Unfiltered next = nextToOffer != null ? nextToOffer : wrapped.next();
 
-            while (iterator.hasNext())
+            if (next.isRow())
             {
-                Unfiltered peek = iterator.peek();
-                // If there was a duplicate row, merge it.
-                if (next.clustering().equals(peek.clustering()) && peek.isRow())
+                while (wrapped.hasNext())
                 {
-                    iterator.next(); // Make sure that the peeked item was consumed.
+                    Unfiltered peek = wrapped.next();
+                    if (!peek.isRow() || !next.clustering().equals(peek.clustering()))
+                    {
+                        nextToOffer = peek; // Offer peek in next call
+                        return next;
+                    }
+    
+                    // Duplicate row, merge it.
                     next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
                 }
-                else
-                {
-                    break;
-                }
             }
 
+            nextToOffer = null;
             return next;
         }
     }
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 7cca4a7..8302a9b 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -101,8 +101,7 @@
         if (sstablesWithTombstones.isEmpty())
             return Collections.emptyList();
 
-        Collections.sort(sstablesWithTombstones, new SSTableReader.SizeComparator());
-        return Collections.singletonList(sstablesWithTombstones.get(0));
+        return Collections.singletonList(Collections.max(sstablesWithTombstones, SSTableReader.sizeComparator));
     }
 
 
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 55daaa1..fd53930 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -143,7 +143,7 @@
         if (sstablesWithTombstones.isEmpty())
             return Collections.emptyList();
 
-        return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+        return Collections.singletonList(Collections.min(sstablesWithTombstones, SSTableReader.sizeComparator));
     }
 
     private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
@@ -314,7 +314,7 @@
         List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
 
         // Trim the largest sstables off the end to meet the maxThreshold
-        Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+        Collections.sort(ssTableReaders, SSTableReader.sizeComparator);
 
         return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
     }
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 91c7ad7..17b1187 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -187,7 +187,7 @@
                         markAndThrow();
 
                     //mimic the scrub read path
-                    try (UnfilteredRowIterator iterator = new SSTableIdentityIterator(sstable, dataFile, key))
+                    try (UnfilteredRowIterator iterator = SSTableIdentityIterator.create(sstable, dataFile, key))
                     {
                     }
 
diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java
index 54df26e..38bde16 100644
--- a/src/java/org/apache/cassandra/db/rows/Cells.java
+++ b/src/java/org/apache/cassandra/db/rows/Cells.java
@@ -233,6 +233,86 @@
         return timeDelta;
     }
 
+    /**
+     * Adds to the builder a representation of the given existing cell that, when merged/reconciled with the given
+     * update cell, produces the same result as merging the original with the update.
+     * <p>
+     * For simple cells that is either the original cell (if still live), or nothing (if shadowed).
+     *
+     * @param existing the pre-existing cell, the one that is updated.
+     * @param update the newly added cell, the update. This can be {@code null} out
+     * of convenience, in which case this function simply copy {@code existing} to
+     * {@code writer}.
+     * @param deletion the deletion time that applies to the cells being considered.
+     * This deletion time may delete both {@code existing} or {@code update}.
+     * @param builder the row builder to which the result of the filtering is written.
+     * @param nowInSec the current time in seconds (which plays a role during reconciliation
+     * because deleted cells always have precedence on timestamp equality and deciding if a
+     * cell is a live or not depends on the current time due to expiring cells).
+     */
+    public static void addNonShadowed(Cell existing,
+                                      Cell update,
+                                      DeletionTime deletion,
+                                      Row.Builder builder,
+                                      int nowInSec)
+    {
+        if (deletion.deletes(existing))
+            return;
+
+        Cell reconciled = reconcile(existing, update, nowInSec);
+        if (reconciled != update)
+            builder.addCell(existing);
+    }
+
+    /**
+     * Adds to the builder a representation of the given existing cell that, when merged/reconciled with the given
+     * update cell, produces the same result as merging the original with the update.
+     * <p>
+     * For simple cells that is either the original cell (if still live), or nothing (if shadowed).
+     *
+     * @param column the complex column the cells are for.
+     * @param existing the pre-existing cells, the ones that are updated.
+     * @param update the newly added cells, the update. This can be {@code null} out
+     * of convenience, in which case this function simply copy the cells from
+     * {@code existing} to {@code writer}.
+     * @param deletion the deletion time that applies to the cells being considered.
+     * This deletion time may delete both {@code existing} or {@code update}.
+     * @param builder the row builder to which the result of the filtering is written.
+     * @param nowInSec the current time in seconds (which plays a role during reconciliation
+     * because deleted cells always have precedence on timestamp equality and deciding if a
+     * cell is a live or not depends on the current time due to expiring cells).
+     */
+    public static void addNonShadowedComplex(ColumnDefinition column,
+                                             Iterator<Cell> existing,
+                                             Iterator<Cell> update,
+                                             DeletionTime deletion,
+                                             Row.Builder builder,
+                                             int nowInSec)
+    {
+        Comparator<CellPath> comparator = column.cellPathComparator();
+        Cell nextExisting = getNext(existing);
+        Cell nextUpdate = getNext(update);
+        while (nextExisting != null)
+        {
+            int cmp = nextUpdate == null ? -1 : comparator.compare(nextExisting.path(), nextUpdate.path());
+            if (cmp < 0)
+            {
+                addNonShadowed(nextExisting, null, deletion, builder, nowInSec);
+                nextExisting = getNext(existing);
+            }
+            else if (cmp == 0)
+            {
+                addNonShadowed(nextExisting, nextUpdate, deletion, builder, nowInSec);
+                nextExisting = getNext(existing);
+                nextUpdate = getNext(update);
+            }
+            else
+            {
+                nextUpdate = getNext(update);
+            }
+        }
+    }
+
     private static Cell getNext(Iterator<Cell> iterator)
     {
         return iterator == null || !iterator.hasNext() ? null : iterator.next();
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index 4f6c8d2..e6d9062 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -25,6 +25,7 @@
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.db.rows.Row.Deletion;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.WrappedInt;
 
@@ -311,4 +312,80 @@
         }
         return timeDelta;
     }
+
+    /**
+     * Returns a row that is obtained from the given existing row by removing everything that is shadowed by data in
+     * the update row. In other words, produces the smallest result row such that
+     * {@code merge(result, update, nowInSec) == merge(existing, update, nowInSec)} after filtering by rangeDeletion.
+     *
+     * @param existing source row
+     * @param update shadowing row
+     * @param rangeDeletion extra {@code DeletionTime} from covering tombstone
+     * @param nowInSec the current time in seconds (which plays a role during reconciliation
+     * because deleted cells always have precedence on timestamp equality and deciding if a
+     * cell is a live or not depends on the current time due to expiring cells).
+     */
+    public static Row removeShadowedCells(Row existing, Row update, DeletionTime rangeDeletion, int nowInSec)
+    {
+        Row.Builder builder = BTreeRow.sortedBuilder();
+        Clustering clustering = existing.clustering();
+        builder.newRow(clustering);
+
+        DeletionTime deletion = update.deletion().time();
+        if (rangeDeletion.supersedes(deletion))
+            deletion = rangeDeletion;
+
+        LivenessInfo existingInfo = existing.primaryKeyLivenessInfo();
+        if (!deletion.deletes(existingInfo))
+            builder.addPrimaryKeyLivenessInfo(existingInfo);
+        Row.Deletion rowDeletion = existing.deletion();
+        if (!deletion.supersedes(rowDeletion.time()))
+            builder.addRowDeletion(rowDeletion);
+
+        Iterator<ColumnData> a = existing.iterator();
+        Iterator<ColumnData> b = update.iterator();
+        ColumnData nexta = a.hasNext() ? a.next() : null, nextb = b.hasNext() ? b.next() : null;
+        while (nexta != null)
+        {
+            int comparison = nextb == null ? -1 : nexta.column.compareTo(nextb.column);
+            if (comparison <= 0)
+            {
+                ColumnData cura = nexta;
+                ColumnDefinition column = cura.column;
+                ColumnData curb = comparison == 0 ? nextb : null;
+                if (column.isSimple())
+                {
+                    Cells.addNonShadowed((Cell) cura, (Cell) curb, deletion, builder, nowInSec);
+                }
+                else
+                {
+                    ComplexColumnData existingData = (ComplexColumnData) cura;
+                    ComplexColumnData updateData = (ComplexColumnData) curb;
+
+                    DeletionTime existingDt = existingData.complexDeletion();
+                    DeletionTime updateDt = updateData == null ? DeletionTime.LIVE : updateData.complexDeletion();
+
+                    DeletionTime maxDt = updateDt.supersedes(deletion) ? updateDt : deletion;
+                    if (existingDt.supersedes(maxDt))
+                    {
+                        builder.addComplexDeletion(column, existingDt);
+                        maxDt = existingDt;
+                    }
+
+                    Iterator<Cell> existingCells = existingData.iterator();
+                    Iterator<Cell> updateCells = updateData == null ? null : updateData.iterator();
+                    Cells.addNonShadowedComplex(column, existingCells, updateCells, maxDt, builder, nowInSec);
+                }
+                nexta = a.hasNext() ? a.next() : null;
+                if (curb != null)
+                    nextb = b.hasNext() ? b.next() : null;
+            }
+            else
+            {
+                nextb = b.hasNext() ? b.next() : null;
+            }
+        }
+        Row row = builder.build();
+        return row != null && !row.isEmpty() ? row : null;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index ed6bd12..db18859 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -24,9 +24,11 @@
 import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Row.Deletion;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.WrappedException;
 
@@ -450,6 +452,58 @@
         }
     }
 
+    public Unfiltered deserializeTombstonesOnly(FileDataInput in, SerializationHeader header, SerializationHelper helper)
+    throws IOException
+    {
+        while (true)
+        {
+            int flags = in.readUnsignedByte();
+            if (isEndOfPartition(flags))
+                return null;
+
+            int extendedFlags = readExtendedFlags(in, flags);
+
+            if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+            {
+                ClusteringBoundOrBoundary bound = ClusteringBoundOrBoundary.serializer.deserialize(in, helper.version, header.clusteringTypes());
+                return deserializeMarkerBody(in, header, bound);
+            }
+            else
+            {
+                assert !isStatic(extendedFlags); // deserializeStaticRow should be used for that.
+                if ((flags & HAS_DELETION) != 0)
+                {
+                    assert header.isForSSTable();
+                    boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
+                    boolean hasTTL = (flags & HAS_TTL) != 0;
+                    boolean deletionIsShadowable = (extendedFlags & HAS_SHADOWABLE_DELETION) != 0;
+                    Clustering clustering = Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes());
+                    long nextPosition = in.readUnsignedVInt() + in.getFilePointer();
+                    in.readUnsignedVInt(); // skip previous unfiltered size
+                    if (hasTimestamp)
+                    {
+                        header.readTimestamp(in);
+                        if (hasTTL)
+                        {
+                            header.readTTL(in);
+                            header.readLocalDeletionTime(in);
+                        }
+                    }
+
+                    Deletion deletion = new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable);
+                    in.seek(nextPosition);
+                    return BTreeRow.emptyDeletedRow(clustering, deletion);
+                }
+                else
+                {
+                    Clustering.serializer.skip(in, helper.version, header.clusteringTypes());
+                    skipRowBody(in);
+                    // Continue with next item.
+                }
+            }
+        }
+    }
+
     public Row deserializeStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper)
     throws IOException
     {
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index de8d69b..1173d40 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -96,7 +96,7 @@
                             dataFile.seek(indexEntry.position);
                             ByteBufferUtil.readWithShortLength(dataFile); // key
 
-                            try (SSTableIdentityIterator partition = new SSTableIdentityIterator(sstable, dataFile, key))
+                            try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key))
                             {
                                 // if the row has statics attached, it has to be indexed separately
                                 indexWriter.nextUnfilteredCluster(partition.staticRow());
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index a5af334..2a79f88 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -19,15 +19,15 @@
 
 import java.io.*;
 
-import org.apache.cassandra.utils.AbstractIterator;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implements Comparable<SSTableIdentityIterator>, UnfilteredRowIterator
+public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, UnfilteredRowIterator
 {
     private final SSTableReader sstable;
     private final DecoratedKey key;
@@ -37,29 +37,51 @@
     protected final SSTableSimpleIterator iterator;
     private final Row staticRow;
 
-    /**
-     * Used to iterate through the columns of a row.
-     * @param sstable SSTable we are reading ffrom.
-     * @param file Reading using this file.
-     * @param key Key of this row.
-     */
-    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
+    public SSTableIdentityIterator(SSTableReader sstable, DecoratedKey key, DeletionTime partitionLevelDeletion,
+            String filename, SSTableSimpleIterator iterator) throws IOException
     {
+        super();
         this.sstable = sstable;
-        this.filename = file.getPath();
         this.key = key;
+        this.partitionLevelDeletion = partitionLevelDeletion;
+        this.filename = filename;
+        this.iterator = iterator;
+        this.staticRow = iterator.readStaticRow();
+    }
 
+    public static SSTableIdentityIterator create(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
+    {
         try
         {
-            this.partitionLevelDeletion = DeletionTime.serializer.deserialize(file);
+            DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(file);
             SerializationHelper helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL);
-            this.iterator = SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion);
-            this.staticRow = iterator.readStaticRow();
+            SSTableSimpleIterator iterator = SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion);
+            return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, file.getPath(), iterator);
         }
         catch (IOException e)
         {
             sstable.markSuspect();
-            throw new CorruptSSTableException(e, filename);
+            throw new CorruptSSTableException(e, file.getPath());
+        }
+    }
+
+    public static SSTableIdentityIterator create(SSTableReader sstable, FileDataInput dfile, RowIndexEntry<?> indexEntry, DecoratedKey key, boolean tombstoneOnly)
+    {
+        try
+        {
+            dfile.seek(indexEntry.position);
+            ByteBufferUtil.skipShortLength(dfile); // Skip partition key
+            DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(dfile);
+            SerializationHelper helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL);
+            SSTableSimpleIterator iterator = tombstoneOnly
+                    ? SSTableSimpleIterator.createTombstoneOnly(sstable.metadata, dfile, sstable.header, helper, partitionLevelDeletion)
+                    : SSTableSimpleIterator.create(sstable.metadata, dfile, sstable.header, helper, partitionLevelDeletion);
+            return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, dfile.getPath(), iterator);
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, dfile.getPath());
         }
     }
 
@@ -93,7 +115,32 @@
         return staticRow;
     }
 
-    protected Unfiltered computeNext()
+    public boolean hasNext()
+    {
+        try
+        {
+            return iterator.hasNext();
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, filename);
+        }
+        catch (IOError e)
+        {
+            if (e.getCause() instanceof IOException)
+            {
+                sstable.markSuspect();
+                throw new CorruptSSTableException((Exception)e.getCause(), filename);
+            }
+            else
+            {
+                throw e;
+            }
+        }
+    }
+
+    public Unfiltered next()
     {
         try
         {
@@ -120,7 +167,7 @@
 
     protected Unfiltered doCompute()
     {
-        return iterator.hasNext() ? iterator.next() : endOfData();
+        return iterator.next();
     }
 
     public void close()
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 2d4314e..ce42126 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -30,6 +30,7 @@
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.net.MessagingService;
 
 /**
@@ -59,6 +60,14 @@
             return new CurrentFormatIterator(metadata, in, header, helper);
     }
 
+    public static SSTableSimpleIterator createTombstoneOnly(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion)
+    {
+        if (helper.version < MessagingService.VERSION_30)
+            return new OldFormatTombstoneIterator(metadata, in, helper, partitionDeletion);
+        else
+            return new CurrentFormatTombstoneIterator(metadata, in, header, helper);
+    }
+
     public abstract Row readStaticRow() throws IOException;
 
     private static class CurrentFormatIterator extends SSTableSimpleIterator
@@ -93,6 +102,41 @@
         }
     }
 
+    private static class CurrentFormatTombstoneIterator extends SSTableSimpleIterator
+    {
+        private final SerializationHeader header;
+
+        private CurrentFormatTombstoneIterator(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper)
+        {
+            super(metadata, in, helper);
+            this.header = header;
+        }
+
+        public Row readStaticRow() throws IOException
+        {
+            if (header.hasStatic())
+            {
+                Row staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, helper);
+                if (!staticRow.deletion().isLive())
+                    return BTreeRow.emptyDeletedRow(staticRow.clustering(), staticRow.deletion());
+            }
+            return Rows.EMPTY_STATIC_ROW;
+        }
+
+        protected Unfiltered computeNext()
+        {
+            try
+            {
+                Unfiltered unfiltered = UnfilteredSerializer.serializer.deserializeTombstonesOnly((FileDataInput) in, header, helper);
+                return unfiltered == null ? endOfData() : unfiltered;
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+    }
+
     private static class OldFormatIterator extends SSTableSimpleIterator
     {
         private final UnfilteredDeserializer deserializer;
@@ -163,4 +207,35 @@
 
     }
 
+    private static class OldFormatTombstoneIterator extends OldFormatIterator
+    {
+        private OldFormatTombstoneIterator(CFMetaData metadata, DataInputPlus in, SerializationHelper helper, DeletionTime partitionDeletion)
+        {
+            super(metadata, in, helper, partitionDeletion);
+        }
+
+        public Row readStaticRow() throws IOException
+        {
+            Row row = super.readStaticRow();
+            if (!row.deletion().isLive())
+                return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion());
+            return Rows.EMPTY_STATIC_ROW;
+        }
+
+        protected Unfiltered computeNext()
+        {
+            while (true)
+            {
+                Unfiltered unfiltered = super.computeNext();
+                if (unfiltered == null || unfiltered.isRangeTombstoneMarker())
+                    return unfiltered;
+
+                Row row = (Row) unfiltered;
+                if (!row.deletion().isLive())
+                    return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion());
+                // Otherwise read next.
+            }
+        }
+
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index d26edfa..32d3156 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -167,6 +167,14 @@
 
     public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 
+    public static final Comparator<SSTableReader> sizeComparator = new Comparator<SSTableReader>()
+    {
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
+        }
+    };
+
     /**
      * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
      * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
@@ -1529,6 +1537,8 @@
     public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
     public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
 
+    public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly);
+
     /**
      * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
      */
@@ -2016,14 +2026,6 @@
         return 0;
     }
 
-    public static class SizeComparator implements Comparator<SSTableReader>
-    {
-        public int compare(SSTableReader o1, SSTableReader o2)
-        {
-            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
-        }
-    }
-
     public EncodingStats stats()
     {
         // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 7a7ce8c..8c64b01 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -30,13 +30,11 @@
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
@@ -120,6 +118,13 @@
     }
 
 
+    @SuppressWarnings("resource") // caller to close
+    @Override
+    public UnfilteredRowIterator simpleIterator(FileDataInput dfile, DecoratedKey key, RowIndexEntry position, boolean tombstoneOnly)
+    {
+        return SSTableIdentityIterator.create(this, dfile, position, key, tombstoneOnly);
+    }
+
     /**
      * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
      * allow key selection by token bounds but only if op != * EQ
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 6d31844..66213a6 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -331,7 +331,7 @@
                             {
                                 dfile.seek(currentEntry.position);
                                 ByteBufferUtil.skipShortLength(dfile); // key
-                                return new SSTableIdentityIterator(sstable, dfile, partitionKey());
+                                return SSTableIdentityIterator.create(sstable, dfile, partitionKey());
                             }
 
                             ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey());
diff --git a/src/java/org/apache/cassandra/schema/CompactionParams.java b/src/java/org/apache/cassandra/schema/CompactionParams.java
index 720efa3..73271f1 100644
--- a/src/java/org/apache/cassandra/schema/CompactionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompactionParams.java
@@ -45,7 +45,8 @@
         CLASS,
         ENABLED,
         MIN_THRESHOLD,
-        MAX_THRESHOLD;
+        MAX_THRESHOLD,
+        PROVIDE_OVERLAPPING_TOMBSTONES;
 
         @Override
         public String toString()
@@ -54,27 +55,38 @@
         }
     }
 
+    public enum TombstoneOption
+    {
+        NONE,
+        ROW,
+        CELL;
+    }
+
     public static final int DEFAULT_MIN_THRESHOLD = 4;
     public static final int DEFAULT_MAX_THRESHOLD = 32;
 
     public static final boolean DEFAULT_ENABLED = true;
+    public static final TombstoneOption DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES =
+            TombstoneOption.valueOf(System.getProperty("default.provide.overlapping.tombstones", TombstoneOption.NONE.toString()).toUpperCase());
 
     public static final Map<String, String> DEFAULT_THRESHOLDS =
         ImmutableMap.of(Option.MIN_THRESHOLD.toString(), Integer.toString(DEFAULT_MIN_THRESHOLD),
                         Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD));
 
     public static final CompactionParams DEFAULT =
-        new CompactionParams(SizeTieredCompactionStrategy.class, DEFAULT_THRESHOLDS, DEFAULT_ENABLED);
+        new CompactionParams(SizeTieredCompactionStrategy.class, DEFAULT_THRESHOLDS, DEFAULT_ENABLED, DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES);
 
     private final Class<? extends AbstractCompactionStrategy> klass;
     private final ImmutableMap<String, String> options;
     private final boolean isEnabled;
+    private final TombstoneOption tombstoneOption;
 
-    private CompactionParams(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options, boolean isEnabled)
+    private CompactionParams(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options, boolean isEnabled, TombstoneOption tombstoneOption)
     {
         this.klass = klass;
         this.options = ImmutableMap.copyOf(options);
         this.isEnabled = isEnabled;
+        this.tombstoneOption = tombstoneOption;
     }
 
     public static CompactionParams create(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options)
@@ -82,6 +94,8 @@
         boolean isEnabled = options.containsKey(Option.ENABLED.toString())
                           ? Boolean.parseBoolean(options.get(Option.ENABLED.toString()))
                           : DEFAULT_ENABLED;
+        TombstoneOption tombstoneOption = TombstoneOption.valueOf(options.getOrDefault(Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString(),
+                                                                                       DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES.toString()).toUpperCase());
 
         Map<String, String> allOptions = new HashMap<>(options);
         if (supportsThresholdParams(klass))
@@ -90,7 +104,7 @@
             allOptions.putIfAbsent(Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD));
         }
 
-        return new CompactionParams(klass, allOptions, isEnabled);
+        return new CompactionParams(klass, allOptions, isEnabled, tombstoneOption);
     }
 
     public static CompactionParams scts(Map<String, String> options)
@@ -119,6 +133,11 @@
              : Integer.parseInt(threshold);
     }
 
+    public TombstoneOption tombstoneOption()
+    {
+        return tombstoneOption;
+    }
+
     public void validate()
     {
         try
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d64fc04..0a35296 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -75,6 +75,7 @@
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.repair.*;
 import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.service.paxos.CommitVerbHandler;
 import org.apache.cassandra.service.paxos.PrepareVerbHandler;
@@ -2809,6 +2810,19 @@
         return status.statusCode;
     }
 
+    public int garbageCollect(String tombstoneOptionString, int jobs, String keyspaceName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException
+    {
+        TombstoneOption tombstoneOption = TombstoneOption.valueOf(tombstoneOptionString);
+        CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
+        for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
+        {
+            CompactionManager.AllSSTableOpStatus oneStatus = cfs.garbageCollect(tombstoneOption, jobs);
+            if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+                status = oneStatus;
+        }
+        return status.statusCode;
+    }
+
     /**
      * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified.
      *
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index f7da817..abb10c1 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -290,6 +290,12 @@
     public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException;
 
     /**
+     * Rewrites all sstables from the given tables to remove deleted data.
+     * The tombstone option defines the granularity of the procedure: ROW removes deleted partitions and rows, CELL also removes overwritten or deleted cells.
+     */
+    public int garbageCollect(String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+
+    /**
      * Flush all memtables for the given column families, or all columnfamilies for the given keyspace
      * if none are explicitly listed.
      * @param keyspaceName
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 84eeb04..c33dfa4 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -73,6 +73,7 @@
 import org.apache.cassandra.metrics.ThreadPoolMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.CacheServiceMBean;
 import org.apache.cassandra.service.GCInspector;
@@ -259,6 +260,11 @@
         return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames);
     }
 
+    public int garbageCollect(String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    {
+        return ssProxy.garbageCollect(tombstoneOption, jobs, keyspaceName, tableNames);
+    }
+
     private void checkJobs(PrintStream out, int jobs)
     {
         if (jobs > DatabaseDescriptor.getConcurrentCompactors())
@@ -301,7 +307,16 @@
         if (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames) != 0)
         {
             failed = true;
-            out.println("Aborted upgrading sstables for atleast one table in keyspace "+keyspaceName+", check server logs for more information.");
+            out.println("Aborted upgrading sstables for at least one table in keyspace " + keyspaceName + ", check server logs for more information.");
+        }
+    }
+
+    public void garbageCollect(PrintStream out, String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    {
+        if (garbageCollect(tombstoneOption, jobs, keyspaceName, tableNames) != 0)
+        {
+            failed = true;
+            out.println("Aborted garbage collection for at least one table in keyspace " + keyspaceName + ", check server logs for more information.");
         }
     }
 
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 8640b58..cde4ee5 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -64,6 +64,7 @@
                 Verify.class,
                 Flush.class,
                 UpgradeSSTable.class,
+                GarbageCollect.class,
                 DisableAutoCompaction.class,
                 EnableAutoCompaction.class,
                 CompactionStats.class,
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
new file mode 100644
index 0000000..37daf09
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "garbagecollect", description = "Remove deleted data from one or more tables")
+public class GarbageCollect extends NodeToolCmd
+{
+    @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
+    private List<String> args = new ArrayList<>();
+
+    @Option(title = "granularity",
+        name = {"-g", "--granularity"},
+        allowedValues = {"ROW", "CELL"},
+        description = "Granularity of garbage removal. ROW (default) removes deleted partitions and rows, CELL also removes overwritten or deleted cells.")
+    private String tombstoneOption = "ROW";
+
+    @Option(title = "jobs",
+            name = {"-j", "--jobs"},
+            description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction threads")
+    private int jobs = 2;
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        List<String> keyspaces = parseOptionalKeyspace(args, probe);
+        String[] tableNames = parseOptionalTables(args);
+
+        for (String keyspace : keyspaces)
+        {
+            try
+            {
+                probe.garbageCollect(System.out, tombstoneOption, jobs, keyspace, tableNames);
+            } catch (Exception e)
+            {
+                throw new RuntimeException("Error occurred during garbage collection", e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/long/org/apache/cassandra/cql3/GcCompactionBench.java b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
new file mode 100644
index 0000000..ca39b55
--- /dev/null
+++ b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Iterables;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class GcCompactionBench extends CQLTester
+{
+    private static final String SIZE_TIERED_STRATEGY = "SizeTieredCompactionStrategy', 'min_sstable_size' : '0";
+    private static final String LEVELED_STRATEGY = "LeveledCompactionStrategy', 'sstable_size_in_mb' : '16";
+
+    private static final int DEL_SECTIONS = 1000;
+    private static final int FLUSH_FREQ = 10000;
+    private static final int RANGE_FREQUENCY_INV = 16;
+    static final int COUNT = 90000;
+    static final int ITERS = 9;
+
+    static final int KEY_RANGE = 10;
+    static final int CLUSTERING_RANGE = 210000;
+
+    static final int EXTRA_SIZE = 1025;
+
+    // The name of this method is important!
+    // CommitLog settings must be applied before CQLTester sets up; by using the same name as its @BeforeClass method we
+    // are effectively overriding it.
+    @BeforeClass
+    public static void setUpClass()     // overrides CQLTester.setUpClass()
+    {
+        DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(100);
+        CQLTester.setUpClass();
+    }
+
+    String hashQuery;
+
+    @Before
+    public void before() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                    "  key int," +
+                    "  column int," +
+                    "  data int," +
+                    "  extra text," +
+                    "  PRIMARY KEY(key, column)" +
+                    ")"
+                   );
+
+        String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, int",
+                " CREATE FUNCTION %s (state int, val int)" +
+                " CALLED ON NULL INPUT" +
+                " RETURNS int" +
+                " LANGUAGE java" +
+                " AS 'return val != null ? state * 17 + val : state;'")).name;
+        String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, text",
+                " CREATE FUNCTION %s (state int, val text)" +
+                " CALLED ON NULL INPUT" +
+                " RETURNS int" +
+                " LANGUAGE java" +
+                " AS 'return val != null ? state * 17 + val.hashCode() : state;'")).name;
+
+        String hashInt = createAggregate(KEYSPACE, "int",
+                " CREATE AGGREGATE %s (int)" +
+                " SFUNC " + hashIFunc +
+                " STYPE int" +
+                " INITCOND 1");
+        String hashText = createAggregate(KEYSPACE, "text",
+                " CREATE AGGREGATE %s (text)" +
+                " SFUNC " + hashTFunc +
+                " STYPE int" +
+                " INITCOND 1");
+
+        hashQuery = String.format("SELECT count(column), %s(key), %s(column), %s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s",
+                                  hashInt, hashInt, hashInt, hashText);
+    }
+    AtomicLong id = new AtomicLong();
+    long compactionTimeNanos = 0;
+
+    void pushData(Random rand, int count) throws Throwable
+    {
+        for (int i = 0; i < count; ++i)
+        {
+            long ii = id.incrementAndGet();
+            if (ii % 1000 == 0)
+                System.out.print('.');
+            int key = rand.nextInt(KEY_RANGE);
+            int column = rand.nextInt(CLUSTERING_RANGE);
+            execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", key, column, (int) ii, genExtra(rand));
+            maybeCompact(ii);
+        }
+    }
+
+    private String genExtra(Random rand)
+    {
+        StringBuilder builder = new StringBuilder(EXTRA_SIZE);
+        for (int i = 0; i < EXTRA_SIZE; ++i)
+            builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1)));
+        return builder.toString();
+    }
+
+    void deleteData(Random rand, int count) throws Throwable
+    {
+        for (int i = 0; i < count; ++i)
+        {
+            int key;
+            UntypedResultSet res;
+            long ii = id.incrementAndGet();
+            if (ii % 1000 == 0)
+                System.out.print('-');
+            if (rand.nextInt(RANGE_FREQUENCY_INV) != 1)
+            {
+                do
+                {
+                    key = rand.nextInt(KEY_RANGE);
+                    long cid = rand.nextInt(DEL_SECTIONS);
+                    int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
+                    int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS);
+                    res = execute("SELECT column FROM %s WHERE key = ? AND column >= ? AND column < ? LIMIT 1", key, cstart, cend);
+                } while (res.size() == 0);
+                UntypedResultSet.Row r = Iterables.get(res, rand.nextInt(res.size()));
+                int clustering = r.getInt("column");
+                execute("DELETE FROM %s WHERE key = ? AND column = ?", key, clustering);
+            }
+            else
+            {
+                key = rand.nextInt(KEY_RANGE);
+                long cid = rand.nextInt(DEL_SECTIONS);
+                int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
+                int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS);
+                res = execute("DELETE FROM %s WHERE key = ? AND column >= ? AND column < ?", key, cstart, cend);
+            }
+            maybeCompact(ii);
+        }
+    }
+
+    private void maybeCompact(long ii)
+    {
+        if (ii % FLUSH_FREQ == 0)
+        {
+            System.out.print("F");
+            flush();
+            if (ii % (FLUSH_FREQ * 10) == 0)
+            {
+                System.out.println("C");
+                long startTime = System.nanoTime();
+                getCurrentColumnFamilyStore().enableAutoCompaction(true);
+                long endTime = System.nanoTime();
+                compactionTimeNanos += endTime - startTime;
+                getCurrentColumnFamilyStore().disableAutoCompaction();
+            }
+        }
+    }
+
+    public void testGcCompaction(TombstoneOption tombstoneOption, TombstoneOption backgroundTombstoneOption, String compactionClass) throws Throwable
+    {
+        id.set(0);
+        compactionTimeNanos = 0;
+        alterTable("ALTER TABLE %s WITH compaction = { 'class' :  '" + compactionClass + "', 'provide_overlapping_tombstones' : '" + backgroundTombstoneOption + "'  };");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        long onStartTime = System.currentTimeMillis();
+        ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+        List<Future<?>> tasks = new ArrayList<>();
+        for (int ti = 0; ti < 1; ++ti)
+        {
+            Random rand = new Random(ti);
+            tasks.add(es.submit(() -> 
+            {
+                for (int i = 0; i < ITERS; ++i)
+                    try
+                    {
+                        pushData(rand, COUNT);
+                        deleteData(rand, COUNT / 3);
+                    }
+                    catch (Throwable e)
+                    {
+                        throw new AssertionError(e);
+                    }
+            }));
+        }
+        for (Future<?> task : tasks)
+            task.get();
+
+        flush();
+        long onEndTime = System.currentTimeMillis();
+        int startRowCount = countRows(cfs);
+        int startTombCount = countTombstoneMarkers(cfs);
+        int startRowDeletions = countRowDeletions(cfs);
+        int startTableCount = cfs.getLiveSSTables().size();
+        long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
+        System.out.println();
+
+        String hashesBefore = getHashes();
+
+        long startTime = System.currentTimeMillis();
+        CompactionManager.instance.performGarbageCollection(cfs, tombstoneOption, 0);
+        long endTime = System.currentTimeMillis();
+
+        int endRowCount = countRows(cfs);
+        int endTombCount = countTombstoneMarkers(cfs);
+        int endRowDeletions = countRowDeletions(cfs);
+        int endTableCount = cfs.getLiveSSTables().size();
+        long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
+
+        System.out.println(cfs.getCompactionParametersJson());
+        System.out.println(String.format("%s compactions completed in %.3fs",
+                tombstoneOption.toString(), (endTime - startTime) * 1e-3));
+        System.out.println(String.format("Operations completed in %.3fs, out of which %.3f for ongoing " + backgroundTombstoneOption + " background compactions",
+                (onEndTime - onStartTime) * 1e-3, compactionTimeNanos * 1e-9));
+        System.out.println(String.format("At start: %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers",
+                startTableCount, startSize, startRowCount, startRowDeletions, startTombCount));
+        System.out.println(String.format("At end:   %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers",
+                endTableCount, endSize, endRowCount, endRowDeletions, endTombCount));
+
+        String hashesAfter = getHashes();
+        Assert.assertEquals(hashesBefore, hashesAfter);
+    }
+
+    private String getHashes() throws Throwable
+    {
+        long startTime = System.currentTimeMillis();
+        String hashes = Arrays.toString(getRows(execute(hashQuery))[0]);
+        long endTime = System.currentTimeMillis();
+        System.out.println(String.format("Hashes: %s, retrieved in %.3fs", hashes, (endTime - startTime) * 1e-3));
+        return hashes;
+    }
+
+    @Test
+    public void testCellAtEnd() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testRowAtEnd() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testCellThroughout() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testRowThroughout() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testCopyCompaction() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testCellAtEndSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
+    }
+
+    @Test
+    public void testRowAtEndSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.ROW, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
+    }
+
+    @Test
+    public void testCellThroughoutSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, SIZE_TIERED_STRATEGY);
+    }
+
+    @Test
+    public void testRowThroughoutSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, SIZE_TIERED_STRATEGY);
+    }
+
+    @Test
+    public void testCopyCompactionSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
+    }
+
+    int countTombstoneMarkers(ColumnFamilyStore cfs)
+    {
+        return count(cfs, x -> x.isRangeTombstoneMarker());
+    }
+
+    int countRowDeletions(ColumnFamilyStore cfs)
+    {
+        return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive());
+    }
+
+    int countRows(ColumnFamilyStore cfs)
+    {
+        int nowInSec = FBUtilities.nowInSeconds();
+        return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec));
+    }
+
+    private int count(ColumnFamilyStore cfs, Predicate<Unfiltered> predicate)
+    {
+        int count = 0;
+        for (SSTableReader reader : cfs.getLiveSSTables())
+            count += count(reader, predicate);
+        return count;
+    }
+
+    int count(SSTableReader reader, Predicate<Unfiltered> predicate)
+    {
+        int instances = 0;
+        try (ISSTableScanner partitions = reader.getScanner())
+        {
+            while (partitions.hasNext())
+            {
+                try (UnfilteredRowIterator iter = partitions.next())
+                {
+                    while (iter.hasNext())
+                    {
+                        Unfiltered atom = iter.next();
+                        if (predicate.test(atom))
+                            ++instances;
+                    }
+                }
+            }
+        }
+        return instances;
+    }
+}
diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
new file mode 100644
index 0000000..6fed033
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class GcCompactionTest extends CQLTester
+{
+    static final int KEY_COUNT = 10;
+    static final int CLUSTERING_COUNT = 20;
+
+    @Test
+    public void testGcCompactionPartitions() throws Throwable
+    {
+        runCompactionTest("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  data int," +
+                          "  extra text," +
+                          "  PRIMARY KEY((key, column), data)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row'  };"
+                          );
+
+    }
+
+    @Test
+    public void testGcCompactionRows() throws Throwable
+    {
+        runCompactionTest("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  data int," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key, column)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row'  };"
+                          );
+
+    }
+
+    @Test
+    public void testGcCompactionRanges() throws Throwable
+    {
+
+        runCompactionTest("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  col2 int," +
+                          "  data int," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key, column, data)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row'  };"
+                          );
+    }
+
+    private void runCompactionTest(String tableDef) throws Throwable
+    {
+        createTable(tableDef);
+
+        for (int i = 0; i < KEY_COUNT; ++i)
+            for (int j = 0; j < CLUSTERING_COUNT; ++j)
+                execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+        Set<SSTableReader> readers = new HashSet<>();
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader table0 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table0));
+        int rowCount = countRows(table0);
+
+        deleteWithSomeInserts(3, 5, 10);
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table1 = getNewTable(readers);
+        assertTrue(countRows(table1) > 0);
+        assertTrue(countTombstoneMarkers(table1) > 0);
+
+        deleteWithSomeInserts(5, 6, 0);
+        flush();
+        assertEquals(3, cfs.getLiveSSTables().size());
+        SSTableReader table2 = getNewTable(readers);
+        assertEquals(0, countRows(table2));
+        assertTrue(countTombstoneMarkers(table2) > 0);
+
+        CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+        assertEquals(3, cfs.getLiveSSTables().size());
+        SSTableReader table3 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table3));
+        assertTrue(rowCount > countRows(table3));
+    }
+
+    @Test
+    public void testGcCompactionCells() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  data int," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell'  };"
+                          );
+
+        for (int i = 0; i < KEY_COUNT; ++i)
+            for (int j = 0; j < CLUSTERING_COUNT; ++j)
+                execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+        Set<SSTableReader> readers = new HashSet<>();
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader table0 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table0));
+        int cellCount = countCells(table0);
+
+        deleteWithSomeInserts(3, 0, 2);
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table1 = getNewTable(readers);
+        assertTrue(countCells(table1) > 0);
+        assertEquals(0, countTombstoneMarkers(table0));
+
+        CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table3 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table3));
+        assertTrue(cellCount > countCells(table3));
+    }
+
+    @Test
+    public void testGcCompactionStatic() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  data int static," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key, column)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell'  };"
+                          );
+
+        for (int i = 0; i < KEY_COUNT; ++i)
+            for (int j = 0; j < CLUSTERING_COUNT; ++j)
+                execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+        Set<SSTableReader> readers = new HashSet<>();
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader table0 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table0));
+        int cellCount = countStaticCells(table0);
+        assertEquals(KEY_COUNT, cellCount);
+
+        execute("DELETE data FROM %s WHERE key = 0");   // delete static cell
+        execute("INSERT INTO %s (key, data) VALUES (1, 0)");  // overwrite static cell
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table1 = getNewTable(readers);
+        assertTrue(countStaticCells(table1) > 0);
+        assertEquals(0, countTombstoneMarkers(table0));
+
+        CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table3 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table3));
+        assertEquals(cellCount - 2, countStaticCells(table3));
+    }
+
+    @Test
+    public void testGcCompactionComplexColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  data map<int, int>," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell'  };"
+                          );
+
+        for (int i = 0; i < KEY_COUNT; ++i)
+            for (int j = 0; j < CLUSTERING_COUNT; ++j)
+                execute("UPDATE %s SET data[?] = ? WHERE key = ?", j, i+j, i);
+
+        Set<SSTableReader> readers = new HashSet<>();
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader table0 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table0));
+        int cellCount = countComplexCells(table0);
+
+        deleteWithSomeInsertsComplexColumn(3, 5, 8);
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table1 = getNewTable(readers);
+        assertTrue(countComplexCells(table1) > 0);
+        assertEquals(0, countTombstoneMarkers(table0));
+
+        CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table3 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table3));
+        assertEquals(cellCount - 23, countComplexCells(table3));
+    }
+
+    @Test
+    public void testLocalDeletionTime() throws Throwable
+    {
+        createTable("create table %s (k int, c1 int, primary key (k, c1)) with compaction = {'class': 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones':'row'}");
+        execute("delete from %s where k = 1");
+        Set<SSTableReader> readers = new HashSet<>(getCurrentColumnFamilyStore().getLiveSSTables());
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        SSTableReader oldSSTable = getNewTable(readers);
+        Thread.sleep(2000);
+        execute("delete from %s where k = 1");
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        SSTableReader newTable = getNewTable(readers);
+
+        CompactionManager.instance.forceUserDefinedCompaction(oldSSTable.getFilename());
+
+        // Old table now doesn't contain any data and should disappear.
+        assertEquals(Collections.singleton(newTable), getCurrentColumnFamilyStore().getLiveSSTables());
+    }
+
+    private SSTableReader getNewTable(Set<SSTableReader> readers)
+    {
+        Set<SSTableReader> newOnes = new HashSet<>(getCurrentColumnFamilyStore().getLiveSSTables());
+        newOnes.removeAll(readers);
+        assertEquals(1, newOnes.size());
+        readers.addAll(newOnes);
+        return Iterables.get(newOnes, 0);
+    }
+
+    void deleteWithSomeInserts(int key_step, int delete_step, int readd_step) throws Throwable
+    {
+        for (int i = 0; i < KEY_COUNT; i += key_step)
+        {
+            if (delete_step > 0)
+                for (int j = i % delete_step; j < CLUSTERING_COUNT; j += delete_step)
+                {
+                    execute("DELETE FROM %s WHERE key = ? AND column = ?", i, j);
+                }
+            if (readd_step > 0)
+                for (int j = i % readd_step; j < CLUSTERING_COUNT; j += readd_step)
+                {
+                    execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i-j, "readded " + i + ":" + j);
+                }
+        }
+    }
+
+    void deleteWithSomeInsertsComplexColumn(int key_step, int delete_step, int readd_step) throws Throwable
+    {
+        for (int i = 0; i < KEY_COUNT; i += key_step)
+        {
+            if (delete_step > 0)
+                for (int j = i % delete_step; j < CLUSTERING_COUNT; j += delete_step)
+                {
+                    execute("DELETE data[?] FROM %s WHERE key = ?", j, i);
+                }
+            if (readd_step > 0)
+                for (int j = i % readd_step; j < CLUSTERING_COUNT; j += readd_step)
+                {
+                    execute("UPDATE %s SET data[?] = ? WHERE key = ?", j, -(i+j), i);
+                }
+        }
+    }
+
+    int countTombstoneMarkers(SSTableReader reader)
+    {
+        int nowInSec = FBUtilities.nowInSeconds();
+        return count(reader, x -> x.isRangeTombstoneMarker() || x.isRow() && ((Row) x).hasDeletion(nowInSec) ? 1 : 0, x -> x.partitionLevelDeletion().isLive() ? 0 : 1);
+    }
+
+    int countRows(SSTableReader reader)
+    {
+        int nowInSec = FBUtilities.nowInSeconds();
+        return count(reader, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec) ? 1 : 0, x -> 0);
+    }
+
+    int countCells(SSTableReader reader)
+    {
+        return count(reader, x -> x.isRow() ? Iterables.size((Row) x) : 0, x -> 0);
+    }
+
+    int countStaticCells(SSTableReader reader)
+    {
+        return count(reader, x -> 0, x -> Iterables.size(x.staticRow()));
+    }
+
+    int countComplexCells(SSTableReader reader)
+    {
+        return count(reader, x -> x.isRow() ? ((Row) x).stream().mapToInt(this::countComplex).sum() : 0, x -> 0);
+    }
+
+    int countComplex(ColumnData c)
+    {
+        if (!(c instanceof ComplexColumnData))
+            return 0;
+        ComplexColumnData ccd = (ComplexColumnData) c;
+        return ccd.cellsCount();
+    }
+
+    int count(SSTableReader reader, Function<Unfiltered, Integer> predicate, Function<UnfilteredRowIterator, Integer> partitionPredicate)
+    {
+        int instances = 0;
+        try (ISSTableScanner partitions = reader.getScanner())
+        {
+            while (partitions.hasNext())
+            {
+                try (UnfilteredRowIterator iter = partitions.next())
+                {
+                    instances += partitionPredicate.apply(iter);
+                    while (iter.hasNext())
+                    {
+                        Unfiltered atom = iter.next();
+                        instances += predicate.apply(atom);
+                    }
+                }
+            }
+        }
+        return instances;
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
index c930b2a..ece2d1d 100644
--- a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
@@ -51,7 +51,7 @@
     String functionName;
 
     @BeforeClass
-    public static void setUpClass()
+    public static void setUpClass()     // overrides CQLTester.setUpClass()
     {
         DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
 
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
new file mode 100644
index 0000000..2189e15
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import static org.junit.Assert.*;
+
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.*;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+public class CompactionIteratorTest
+{
+
+    private static final int NOW = 1000;
+    private static final int GC_BEFORE = 100;
+    private static final String KSNAME = "CompactionIteratorTest";
+    private static final String CFNAME = "Integer1";
+
+    static final DecoratedKey kk = Util.dk("key");
+    static final CFMetaData metadata;
+    private static final int RANGE = 1000;
+    private static final int COUNT = 100;
+
+    Map<List<Unfiltered>, DeletionTime> deletionTimes = new HashMap<>();
+
+    static {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KSNAME,
+                                    KeyspaceParams.simple(1),
+                                    metadata = SchemaLoader.standardCFMD(KSNAME,
+                                                                         CFNAME,
+                                                                         1,
+                                                                         UTF8Type.instance,
+                                                                         Int32Type.instance,
+                                                                         Int32Type.instance));
+    }
+
+    // See org.apache.cassandra.db.rows.UnfilteredRowsGenerator.parse for the syntax used in these tests.
+
+    @Test
+    public void testGcCompactionSupersedeLeft()
+    {
+        testCompaction(new String[] {
+            "5<=[140] 10[150] [140]<20 22<[130] [130]<25 30[150]"
+        }, new String[] {
+            "7<[160] 15[180] [160]<30 40[120]"
+        },
+        3);
+    }
+
+    @Test
+    public void testGcCompactionSupersedeMiddle()
+    {
+        testCompaction(new String[] {
+            "5<=[140] 10[150] [140]<40 60[150]"
+        }, new String[] {
+            "7<=[160] 15[180] [160]<=30 40[120]"
+        },
+        3);
+    }
+
+    @Test
+    public void testGcCompactionSupersedeRight()
+    {
+        testCompaction(new String[] {
+            "9<=[140] 10[150] [140]<40 60[150]"
+        }, new String[] {
+            "7<[160] 15[180] [160]<30 40[120]"
+        },
+        3);
+    }
+
+    @Test
+    public void testGcCompactionSwitchInSuperseded()
+    {
+        testCompaction(new String[] {
+            "5<=[140] 10[150] [140]<20 20<=[170] [170]<=50 60[150]"
+        }, new String[] {
+            "7<[160] 15[180] [160]<30 40[120]"
+        },
+        5);
+    }
+
+    @Test
+    public void testGcCompactionBoundaries()
+    {
+        testCompaction(new String[] {
+            "5<=[120] [120]<9 9<=[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90"
+        }, new String[] {
+            "7<[160] 15[180] [160]<30 40[120] 45<[140] [140]<80 88<=[130] [130]<100"
+        },
+        7);
+    }
+
+    @Test
+    public void testGcCompactionMatches()
+    {
+        testCompaction(new String[] {
+            "5<=[120] [120]<=9 9<[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90 120<=[100] [100]<130"
+        }, new String[] {
+            "9<[160] 15[180] [160]<40 40[120] 45<[140] [140]<90 90<=[110] [110]<100 120<=[100] [100]<130"
+        },
+        5);
+    }
+
+    @Test
+    public void testGcCompactionRowDeletion()
+    {
+        testCompaction(new String[] {
+            "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
+        }, new String[] {
+            "10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]"
+        },
+        "25[160] 30[170] 50[120]");
+    }
+
+    @Test
+    public void testGcCompactionPartitionDeletion()
+    {
+        testCompaction(new String[] {
+            "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
+        }, new String[] {
+            // Dxx| stands for partition deletion at time xx
+            "D165|10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]"
+        },
+        "30[170]");
+    }
+
+    void testCompaction(String[] inputs, String[] tombstones, String expected)
+    {
+        testNonGcCompaction(inputs, tombstones);
+
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+        List<List<Unfiltered>> inputLists = parse(inputs, generator);
+        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
+        List<Unfiltered> result = compact(inputLists, tombstoneLists);
+        System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds");
+        generator.verifyValid(result);
+        verifyEquivalent(inputLists, result, tombstoneLists, generator);
+        List<Unfiltered> expectedResult = generator.parse(expected, NOW - 1);
+        if (!expectedResult.equals(result))
+            fail("Expected " + expected + ", got " + generator.str(result));
+    }
+
+    void testCompaction(String[] inputs, String[] tombstones, int expectedCount)
+    {
+        testNonGcCompaction(inputs, tombstones);
+
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+        List<List<Unfiltered>> inputLists = parse(inputs, generator);
+        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
+        List<Unfiltered> result = compact(inputLists, tombstoneLists);
+        System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds");
+        generator.verifyValid(result);
+        verifyEquivalent(inputLists, result, tombstoneLists, generator);
+        if (size(result) > expectedCount)
+            fail("Expected compaction with " + expectedCount + " elements, got " + size(result) + ": " + generator.str(result));
+    }
+
+    int testNonGcCompaction(String[] inputs, String[] tombstones)
+    {
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+        List<List<Unfiltered>> inputLists = parse(inputs, generator);
+        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
+        List<Unfiltered> result = compact(inputLists, Collections.emptyList());
+        System.out.println("Non-GC compaction resulted in " + size(result) + " Unfiltereds");
+        generator.verifyValid(result);
+        verifyEquivalent(inputLists, result, tombstoneLists, generator);
+        return size(result);
+    }
+
+    private static int size(List<Unfiltered> data)
+    {
+        return data.stream().mapToInt(x -> x instanceof RangeTombstoneBoundaryMarker ? 2 : 1).sum();
+    }
+
+    private void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> result, List<List<Unfiltered>> tombstoneSources, UnfilteredRowsGenerator generator)
+    {
+        // sources + tombstoneSources must be the same as result + tombstoneSources
+        List<Unfiltered> expected = compact(Iterables.concat(sources, tombstoneSources), Collections.emptyList());
+        List<Unfiltered> actual = compact(Iterables.concat(ImmutableList.of(result), tombstoneSources), Collections.emptyList());
+        if (!expected.equals(actual))
+        {
+            System.out.println("Equivalence test failure between sources:");
+            for (List<Unfiltered> partition : sources)
+                generator.dumpList(partition);
+            System.out.println("and compacted " + generator.str(result));
+            System.out.println("with tombstone sources:");
+            for (List<Unfiltered> partition : tombstoneSources)
+                generator.dumpList(partition);
+            System.out.println("expected " + generator.str(expected));
+            System.out.println("got " + generator.str(actual));
+            fail("Failed equivalence test.");
+        }
+    }
+
+    private List<List<Unfiltered>> parse(String[] inputs, UnfilteredRowsGenerator generator)
+    {
+        return ImmutableList.copyOf(Lists.transform(Arrays.asList(inputs), x -> parse(x, generator)));
+    }
+
+    private List<Unfiltered> parse(String input, UnfilteredRowsGenerator generator)
+    {
+        Matcher m = Pattern.compile("D(\\d+)\\|").matcher(input);
+        if (m.lookingAt())
+        {
+            int del = Integer.parseInt(m.group(1));
+            input = input.substring(m.end());
+            List<Unfiltered> list = generator.parse(input, NOW - 1);
+            deletionTimes.put(list, new DeletionTime(del, del));
+            return list;
+        }
+        else
+            return generator.parse(input, NOW - 1);
+    }
+
+    private List<Unfiltered> compact(Iterable<List<Unfiltered>> sources, Iterable<List<Unfiltered>> tombstoneSources)
+    {
+        List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(sources, list -> ImmutableList.of(listToIterator(list, kk))));
+        Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>();
+        transformedSources.put(kk, Iterables.transform(tombstoneSources, list -> listToIterator(list, kk)));
+        try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
+             CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
+                                                              Lists.transform(content, x -> new Scanner(x)),
+                                                              controller, NOW, null))
+        {
+            List<Unfiltered> result = new ArrayList<>();
+            assertTrue(iter.hasNext());
+            try (UnfilteredRowIterator partition = iter.next())
+            {
+                Iterators.addAll(result, partition);
+            }
+            assertFalse(iter.hasNext());
+            return result;
+        }
+    }
+
+    private UnfilteredRowIterator listToIterator(List<Unfiltered> list, DecoratedKey key)
+    {
+        return UnfilteredRowsGenerator.source(list, metadata, key, deletionTimes.getOrDefault(list, DeletionTime.LIVE));
+    }
+
+    NavigableMap<DecoratedKey, List<Unfiltered>> generateContent(Random rand, UnfilteredRowsGenerator generator,
+                                                                 List<DecoratedKey> keys, int pcount, int rcount)
+    {
+        NavigableMap<DecoratedKey, List<Unfiltered>> map = new TreeMap<>();
+        for (int i = 0; i < pcount; ++i)
+        {
+            DecoratedKey key = keys.get(rand.nextInt(keys.size()));
+            map.put(key, generator.generateSource(rand, rcount, RANGE, NOW - 5, x -> NOW - 1));
+        }
+        return map;
+    }
+
+    @Test
+    public void testRandom()
+    {
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+        for (int seed = 1; seed < 100; ++seed)
+        {
+            Random rand = new Random(seed);
+            List<List<Unfiltered>> sources = new ArrayList<>();
+            for (int i = 0; i < 10; ++i)
+                sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15));
+            int srcSz = sources.stream().mapToInt(CompactionIteratorTest::size).sum();
+            List<List<Unfiltered>> tombSources = new ArrayList<>();
+            for (int i = 0; i < 10; ++i)
+                sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15));
+            List<Unfiltered> result = compact(sources, tombSources);
+            verifyEquivalent(sources, result, tombSources, generator);
+            assertTrue(size(result) < srcSz);
+        }
+    }
+
+    class Controller extends CompactionController
+    {
+        private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources;
+
+        public Controller(ColumnFamilyStore cfs, Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources, int gcBefore)
+        {
+            super(cfs, Collections.emptySet(), gcBefore);
+            this.tombstoneSources = tombstoneSources;
+        }
+
+        @Override
+        public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly)
+        {
+            assert tombstoneOnly;
+            return tombstoneSources.get(key);
+        }
+    }
+
+    class Scanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner
+    {
+        Iterator<UnfilteredRowIterator> iter;
+
+        Scanner(Iterable<UnfilteredRowIterator> content)
+        {
+            iter = content.iterator();
+        }
+
+        @Override
+        public boolean isForThrift()
+        {
+            return false;
+        }
+
+        @Override
+        public CFMetaData metadata()
+        {
+            return metadata;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return iter.hasNext();
+        }
+
+        @Override
+        public UnfilteredRowIterator next()
+        {
+            return iter.next();
+        }
+
+        @Override
+        public long getLengthInBytes()
+        {
+            return 0;
+        }
+
+        @Override
+        public long getCurrentPosition()
+        {
+            return 0;
+        }
+
+        @Override
+        public String getBackingFiles()
+        {
+            return null;
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
index 0eeb379..3335c02 100644
--- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -96,9 +96,11 @@
     @SuppressWarnings("unused")
     public void testTombstoneMerge(boolean reversed, boolean iterations)
     {
+        this.reversed = reversed;
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(comparator, reversed);
+
         for (int seed = 1; seed <= 100; ++seed)
         {
-            this.reversed = reversed;
             if (ITEMS <= 20)
                 System.out.println("\nSeed " + seed);
 
@@ -112,27 +114,29 @@
             if (ITEMS <= 20)
                 System.out.println("Merging");
             for (int i=0; i<ITERATORS; ++i)
-                sources.add(generateSource(r, timeGenerators.get(r.nextInt(timeGenerators.size()))));
+                sources.add(generator.generateSource(r, ITEMS, RANGE, DEL_RANGE, timeGenerators.get(r.nextInt(timeGenerators.size()))));
             List<Unfiltered> merged = merge(sources, iterations);
     
             if (ITEMS <= 20)
                 System.out.println("results in");
             if (ITEMS <= 20)
-                dumpList(merged);
-            verifyEquivalent(sources, merged);
-            verifyValid(merged);
+                generator.dumpList(merged);
+            verifyEquivalent(sources, merged, generator);
+            generator.verifyValid(merged);
             if (reversed)
             {
                 Collections.reverse(merged);
-                this.reversed = false;
-                verifyValid(merged);
+                generator.verifyValid(merged, false);
             }
         }
     }
 
     private List<Unfiltered> merge(List<List<Unfiltered>> sources, boolean iterations)
     {
-        List<UnfilteredRowIterator> us = sources.stream().map(l -> new Source(l.iterator())).collect(Collectors.toList());
+        List<UnfilteredRowIterator> us = sources.
+                stream().
+                map(l -> new UnfilteredRowsGenerator.Source(l.iterator(), metadata, partitionKey, DeletionTime.LIVE, reversed)).
+                collect(Collectors.toList());
         List<Unfiltered> merged = new ArrayList<>();
         Iterators.addAll(merged, mergeIterators(us, iterations));
         return merged;
@@ -285,24 +289,24 @@
         }
     }
 
-    void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> merged)
+    void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> merged, UnfilteredRowsGenerator generator)
     {
         try
         {
             for (int i=0; i<RANGE; ++i)
             {
-                Clusterable c = clusteringFor(i);
+                Clusterable c = UnfilteredRowsGenerator.clusteringFor(i);
                 DeletionTime dt = DeletionTime.LIVE;
                 for (List<Unfiltered> source : sources)
                 {
                     dt = deletionFor(c, source, dt);
                 }
-                Assert.assertEquals("Deletion time mismatch for position " + str(c), dt, deletionFor(c, merged));
+                Assert.assertEquals("Deletion time mismatch for position " + i, dt, deletionFor(c, merged));
                 if (dt == DeletionTime.LIVE)
                 {
                     Optional<Unfiltered> sourceOpt = sources.stream().map(source -> rowFor(c, source)).filter(x -> x != null).findAny();
                     Unfiltered mergedRow = rowFor(c, merged);
-                    Assert.assertEquals("Content mismatch for position " + str(c), str(sourceOpt.orElse(null)), str(mergedRow));
+                    Assert.assertEquals("Content mismatch for position " + i, clustering(sourceOpt.orElse(null)), clustering(mergedRow));
                 }
             }
         }
@@ -310,13 +314,20 @@
         {
             System.out.println(e);
             for (List<Unfiltered> list : sources)
-                dumpList(list);
+                generator.dumpList(list);
             System.out.println("merged");
-            dumpList(merged);
+            generator.dumpList(merged);
             throw e;
         }
     }
 
+    String clustering(Clusterable curr)
+    {
+        if (curr == null)
+            return "null";
+        return Int32Type.instance.getString(curr.clustering().get(0));
+    }
+
     private Unfiltered rowFor(Clusterable pointer, List<Unfiltered> list)
     {
         int index = Collections.binarySearch(list, pointer, reversed ? comparator.reversed() : comparator);
@@ -424,21 +435,23 @@
 
     public void testForInput(String... inputs)
     {
+        reversed = false;
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(comparator, false);
+
         List<List<Unfiltered>> sources = new ArrayList<>();
         for (String input : inputs)
         {
-            List<Unfiltered> source = parse(input);
-            attachBoundaries(source);
-            dumpList(source);
-            verifyValid(source);
+            List<Unfiltered> source = generator.parse(input, DEL_RANGE);
+            generator.dumpList(source);
+            generator.verifyValid(source);
             sources.add(source);
         }
 
         List<Unfiltered> merged = merge(sources, false);
         System.out.println("Merge to:");
-        dumpList(merged);
-        verifyEquivalent(sources, merged);
-        verifyValid(merged);
+        generator.dumpList(merged);
+        verifyEquivalent(sources, merged, generator);
+        generator.verifyValid(merged);
         System.out.println();
     }
 
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java
new file mode 100644
index 0000000..7cdccdb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.Assert;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.rows.Unfiltered.Kind;
+import org.apache.cassandra.utils.btree.BTree;
+
+public class UnfilteredRowsGenerator
+{
+    final boolean reversed;
+    final Comparator<Clusterable> comparator;
+
+    public UnfilteredRowsGenerator(Comparator<Clusterable> comparator, boolean reversed)
+    {
+        this.reversed = reversed;
+        this.comparator = comparator;
+    }
+
+    String str(Clusterable curr)
+    {
+        if (curr == null)
+            return "null";
+        String val = Int32Type.instance.getString(curr.clustering().get(0));
+        if (curr instanceof RangeTombstoneMarker)
+        {
+            RangeTombstoneMarker marker = (RangeTombstoneMarker) curr;
+            if (marker.isClose(reversed))
+                val = "[" + marker.closeDeletionTime(reversed).markedForDeleteAt() + "]" + (marker.closeIsInclusive(reversed) ? "<=" : "<") + val;
+            if (marker.isOpen(reversed))
+                val = val + (marker.openIsInclusive(reversed) ? "<=" : "<") + "[" + marker.openDeletionTime(reversed).markedForDeleteAt() + "]";
+        }
+        else if (curr instanceof Row)
+        {
+            Row row = (Row) curr;
+            String delTime = "";
+            if (!row.deletion().time().isLive())
+                delTime = "D" + row.deletion().time().markedForDeleteAt();
+            val = val + "[" + row.primaryKeyLivenessInfo().timestamp() + delTime + "]";
+        }
+        return val;
+    }
+
+    public void verifyValid(List<Unfiltered> list)
+    {
+        verifyValid(list, reversed);
+    }
+
+    void verifyValid(List<Unfiltered> list, boolean reversed)
+    {
+        int reversedAsMultiplier = reversed ? -1 : 1;
+        try {
+            RangeTombstoneMarker prev = null;
+            Unfiltered prevUnfiltered = null;
+            for (Unfiltered unfiltered : list)
+            {
+                Assert.assertTrue("Order violation prev " + str(prevUnfiltered) + " curr " + str(unfiltered),
+                                  prevUnfiltered == null || comparator.compare(prevUnfiltered, unfiltered) * reversedAsMultiplier < 0);
+                prevUnfiltered = unfiltered;
+
+                if (unfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER)
+                {
+                    RangeTombstoneMarker curr = (RangeTombstoneMarker) unfiltered;
+                    if (prev != null)
+                    {
+                        if (curr.isClose(reversed))
+                        {
+                            Assert.assertTrue(str(unfiltered) + " follows another close marker " + str(prev), prev.isOpen(reversed));
+                            Assert.assertEquals("Deletion time mismatch for open " + str(prev) + " and close " + str(unfiltered),
+                                                prev.openDeletionTime(reversed),
+                                                curr.closeDeletionTime(reversed));
+                        }
+                        else
+                            Assert.assertFalse(str(curr) + " follows another open marker " + str(prev), prev.isOpen(reversed));
+                    }
+
+                    prev = curr;
+                }
+            }
+            Assert.assertFalse("Cannot end in open marker " + str(prev), prev != null && prev.isOpen(reversed));
+
+        } catch (AssertionError e) {
+            System.out.println(e);
+            dumpList(list);
+            throw e;
+        }
+    }
+
+    public List<Unfiltered> generateSource(Random r, int items, int range, int del_range, Function<Integer, Integer> timeGenerator)
+    {
+        int[] positions = new int[items + 1];
+        for (int i=0; i<items; ++i)
+            positions[i] = r.nextInt(range);
+        positions[items] = range;
+        Arrays.sort(positions);
+
+        List<Unfiltered> content = new ArrayList<>(items);
+        int prev = -1;
+        for (int i=0; i<items; ++i)
+        {
+            int pos = positions[i];
+            int sz = positions[i + 1] - pos;
+            if (sz == 0 && pos == prev)
+                // Filter out more than two of the same position.
+                continue;
+            if (r.nextBoolean() || pos == prev)
+            {
+                int span;
+                boolean includesStart;
+                boolean includesEnd;
+                if (pos > prev)
+                {
+                    span = r.nextInt(sz + 1);
+                    includesStart = span > 0 ? r.nextBoolean() : true;
+                    includesEnd = span > 0 ? r.nextBoolean() : true;
+                }
+                else
+                {
+                    span = 1 + r.nextInt(sz);
+                    includesStart = false;
+                    includesEnd = r.nextBoolean();
+                }
+                int deltime = r.nextInt(del_range);
+                DeletionTime dt = new DeletionTime(deltime, deltime);
+                content.add(new RangeTombstoneBoundMarker(boundFor(pos, true, includesStart), dt));
+                content.add(new RangeTombstoneBoundMarker(boundFor(pos + span, false, includesEnd), dt));
+                prev = pos + span - (includesEnd ? 0 : 1);
+            }
+            else
+            {
+                content.add(emptyRowAt(pos, timeGenerator));
+                prev = pos;
+            }
+        }
+
+        attachBoundaries(content);
+        if (reversed)
+        {
+            Collections.reverse(content);
+        }
+        verifyValid(content);
+        if (items <= 20)
+            dumpList(content);
+        return content;
+    }
+
+    /**
+     * Constructs a list of unfiltereds with integer clustering according to the specification string.
+     *
+     * The string is a space-delimited sorted list that can contain:
+     *  * open tombstone markers, e.g. xx<[yy] where xx is the clustering, yy is the deletion time, and "<" stands for
+     *    non-inclusive (<= for inclusive).
+     *  * close tombstone markers, e.g. [yy]<=xx. Adjacent close and open markers (e.g. [yy]<=xx xx<[zz]) are combined
+     *    into boundary markers.
+     *  * empty rows, e.g. xx or xx[yy] or xx[yyDzz] where xx is the clustering, yy is the live time and zz is deletion
+     *    time.
+     *
+     * @param input Specification.
+     * @param default_liveness Liveness to use for rows if not explicitly specified.
+     * @return Parsed list.
+     */
+    public List<Unfiltered> parse(String input, int default_liveness)
+    {
+        String[] split = input.split(" ");
+        Pattern open = Pattern.compile("(\\d+)<(=)?\\[(\\d+)\\]");
+        Pattern close = Pattern.compile("\\[(\\d+)\\]<(=)?(\\d+)");
+        Pattern row = Pattern.compile("(\\d+)(\\[(\\d+)(?:D(\\d+))?\\])?");
+        List<Unfiltered> out = new ArrayList<>(split.length);
+        for (String s : split)
+        {
+            Matcher m = open.matcher(s);
+            if (m.matches())
+            {
+                out.add(openMarker(Integer.parseInt(m.group(1)), Integer.parseInt(m.group(3)), m.group(2) != null));
+                continue;
+            }
+            m = close.matcher(s);
+            if (m.matches())
+            {
+                out.add(closeMarker(Integer.parseInt(m.group(3)), Integer.parseInt(m.group(1)), m.group(2) != null));
+                continue;
+            }
+            m = row.matcher(s);
+            if (m.matches())
+            {
+                int live = m.group(3) != null ? Integer.parseInt(m.group(3)) : default_liveness;
+                int delTime = m.group(4) != null ? Integer.parseInt(m.group(4)) : -1;
+                out.add(emptyRowAt(Integer.parseInt(m.group(1)), live, delTime));
+                continue;
+            }
+            Assert.fail("Can't parse " + s);
+        }
+        attachBoundaries(out);
+        return out;
+    }
+
+    static Row emptyRowAt(int pos, Function<Integer, Integer> timeGenerator)
+    {
+        final Clustering clustering = clusteringFor(pos);
+        final LivenessInfo live = LivenessInfo.create(timeGenerator.apply(pos), UnfilteredRowIteratorsMergeTest.nowInSec);
+        return BTreeRow.noCellLiveRow(clustering, live);
+    }
+
+    static Row emptyRowAt(int pos, int time, int deletionTime)
+    {
+        final Clustering clustering = clusteringFor(pos);
+        final LivenessInfo live = LivenessInfo.create(time, UnfilteredRowIteratorsMergeTest.nowInSec);
+        final DeletionTime delTime = deletionTime == -1 ? DeletionTime.LIVE : new DeletionTime(deletionTime, deletionTime);
+        return BTreeRow.create(clustering, live, Row.Deletion.regular(delTime), BTree.empty());
+    }
+
+    static Clustering clusteringFor(int i)
+    {
+        return Clustering.make(Int32Type.instance.decompose(i));
+    }
+
+    static ClusteringBound boundFor(int pos, boolean start, boolean inclusive)
+    {
+        return ClusteringBound.create(ClusteringBound.boundKind(start, inclusive), new ByteBuffer[] {Int32Type.instance.decompose(pos)});
+    }
+
+    static void attachBoundaries(List<Unfiltered> content)
+    {
+        int di = 0;
+        RangeTombstoneMarker prev = null;
+        for (int si = 0; si < content.size(); ++si)
+        {
+            Unfiltered currUnfiltered = content.get(si);
+            RangeTombstoneMarker curr = currUnfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER ?
+                                        (RangeTombstoneMarker) currUnfiltered :
+                                        null;
+            if (prev != null && curr != null && prev.isClose(false) && curr.isOpen(false) && prev.clustering().invert().equals(curr.clustering()))
+            {
+                // Join. Prefer not to use merger to check its correctness.
+                ClusteringBound b = (ClusteringBound) prev.clustering();
+                ClusteringBoundary boundary = ClusteringBoundary.create(
+                        b.isInclusive() ? ClusteringBound.Kind.INCL_END_EXCL_START_BOUNDARY : ClusteringBound.Kind.EXCL_END_INCL_START_BOUNDARY,
+                        b.getRawValues());
+                prev = new RangeTombstoneBoundaryMarker(boundary, prev.closeDeletionTime(false), curr.openDeletionTime(false));
+                currUnfiltered = prev;
+                --di;
+            }
+            content.set(di++, currUnfiltered);
+            prev = curr;
+        }
+        for (int pos = content.size() - 1; pos >= di; --pos)
+            content.remove(pos);
+    }
+
+    static RangeTombstoneMarker openMarker(int pos, int delTime, boolean inclusive)
+    {
+        return marker(pos, delTime, true, inclusive);
+    }
+
+    static RangeTombstoneMarker closeMarker(int pos, int delTime, boolean inclusive)
+    {
+        return marker(pos, delTime, false, inclusive);
+    }
+
+    private static RangeTombstoneMarker marker(int pos, int delTime, boolean isStart, boolean inclusive)
+    {
+        return new RangeTombstoneBoundMarker(ClusteringBound.create(ClusteringBound.boundKind(isStart, inclusive),
+                                                                    new ByteBuffer[] {clusteringFor(pos).get(0)}),
+                                             new DeletionTime(delTime, delTime));
+    }
+
+    public static UnfilteredRowIterator source(Iterable<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey)
+    {
+        return source(content, metadata, partitionKey, DeletionTime.LIVE);
+    }
+
+    public static UnfilteredRowIterator source(Iterable<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey, DeletionTime delTime)
+    {
+        return new Source(content.iterator(), metadata, partitionKey, delTime, false);
+    }
+
+    static class Source extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator
+    {
+        Iterator<Unfiltered> content;
+
+        protected Source(Iterator<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey, DeletionTime partitionLevelDeletion, boolean reversed)
+        {
+            super(metadata,
+                  partitionKey,
+                  partitionLevelDeletion,
+                  metadata.partitionColumns(),
+                  Rows.EMPTY_STATIC_ROW,
+                  reversed,
+                  EncodingStats.NO_STATS);
+            this.content = content;
+        }
+
+        @Override
+        protected Unfiltered computeNext()
+        {
+            return content.hasNext() ? content.next() : endOfData();
+        }
+    }
+
+    public String str(List<Unfiltered> list)
+    {
+        StringBuilder builder = new StringBuilder();
+        for (Unfiltered u : list)
+        {
+            builder.append(str(u));
+            builder.append(' ');
+        }
+        return builder.toString();
+    }
+
+    public void dumpList(List<Unfiltered> list)
+    {
+        System.out.println(str(list));
+    }
+}
\ No newline at end of file