Garbage-collecting compaction operation and schema option.
patch by Branimir Lambov; reviewed by Marcus Eriksson for CASSANDRA-7019
diff --git a/CHANGES.txt b/CHANGES.txt
index 36a21e6..760cc58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
* Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
* Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
* Support filtering on non-PRIMARY KEY columns in the CREATE
diff --git a/NEWS.txt b/NEWS.txt
index 8580c7c..85f2767 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,28 @@
New features
------------
+ - Compaction can now take into account overlapping tables that don't take part
+ in the compaction to look for deleted or overwritten data in the compacted tables.
+ Then such data is found, it can be safely discarded, which in turn should enable
+ the removal of tombstones over that data.
+
+ The behavior can be engaged in two ways:
+ - as a "nodetool garbagecollect -g CELL/ROW" operation, which applies
+ single-table compaction on all sstables to discard deleted data in one step.
+ - as a "provide_overlapping_tombstones:CELL/ROW/NONE" compaction strategy flag,
+ which uses overlapping tables as a source of deletions/overwrites during all
+ compactions.
+ The argument specifies the granularity at which deleted data is to be found:
+ - If ROW is specified, only whole deleted rows (or sets of rows) will be
+ discarded.
+ - If CELL is specified, any columns whose value is overwritten or deleted
+ will also be discarded.
+ - NONE (default) specifies the old behavior, overlapping tables are not used to
+ decide when to discard data.
+ Which option to use depends on your workload, both ROW and CELL increase the
+ disk load on compaction (especially with the size-tiered compaction strategy),
+ with CELL being more resource-intensive. Both should lead to better read
+ performance if deleting rows (resp. overwriting or deleting cells) is common.
- Prepared statements are now persisted in the table prepared_statements in
the system keyspace. Upon startup, this table is used to preload all
previously prepared statements - i.e. in many cases clients do not need to
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 594da98..20dac1e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -71,6 +71,7 @@
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.metrics.TableMetrics.Sampler;
import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
@@ -1580,6 +1581,11 @@
return CompactionManager.instance.relocateSSTables(this, jobs);
}
+ public CompactionManager.AllSSTableOpStatus garbageCollect(TombstoneOption tombstoneOption, int jobs) throws ExecutionException, InterruptedException
+ {
+ return CompactionManager.instance.performGarbageCollection(this, tombstoneOption, jobs);
+ }
+
public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
{
assert !sstables.isEmpty();
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 4728ec3..83592f0 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -44,6 +44,7 @@
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
@@ -492,6 +493,7 @@
uncheckedOptions.remove(LOG_ALL_OPTION);
uncheckedOptions.remove(COMPACTION_ENABLED);
uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES);
+ uncheckedOptions.remove(CompactionParams.Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString());
return uncheckedOptions;
}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e42e7a1..08ad0c0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -20,18 +20,23 @@
import java.util.*;
import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.utils.AlwaysPresentFilter;
-
import org.apache.cassandra.utils.OverlapIterator;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -53,6 +58,10 @@
private Refs<SSTableReader> overlappingSSTables;
private OverlapIterator<PartitionPosition, SSTableReader> overlapIterator;
private final Iterable<SSTableReader> compacting;
+ private final RateLimiter limiter;
+ private final long minTimestamp;
+ final TombstoneOption tombstoneOption;
+ final Map<SSTableReader, FileDataInput> openDataFiles = new HashMap<>();
public final int gcBefore;
@@ -63,11 +72,23 @@
public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore)
{
+ this(cfs, compacting, gcBefore,
+ CompactionManager.instance.getRateLimiter(),
+ cfs.getCompactionStrategyManager().getCompactionParams().tombstoneOption());
+ }
+
+ public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption)
+ {
assert cfs != null;
this.cfs = cfs;
this.gcBefore = gcBefore;
this.compacting = compacting;
+ this.limiter = limiter;
compactingRepaired = compacting != null && compacting.stream().allMatch(SSTableReader::isRepaired);
+ this.tombstoneOption = tombstoneOption;
+ this.minTimestamp = compacting != null && !compacting.isEmpty() // check needed for test
+ ? compacting.stream().mapToLong(SSTableReader::getMinTimestamp).min().getAsLong()
+ : 0;
refreshOverlaps();
if (NEVER_PURGE_TOMBSTONES)
logger.warn("You are running with -Dcassandra.never_purge_tombstones=true, this is dangerous!");
@@ -97,7 +118,7 @@
return;
if (this.overlappingSSTables != null)
- overlappingSSTables.release();
+ close();
if (compacting == null)
overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList());
@@ -228,6 +249,9 @@
{
if (overlappingSSTables != null)
overlappingSSTables.release();
+
+ FileUtils.closeQuietly(openDataFiles.values());
+ openDataFiles.clear();
}
public boolean compactingRepaired()
@@ -235,4 +259,38 @@
return !cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones() || compactingRepaired;
}
+ boolean provideTombstoneSources()
+ {
+ return tombstoneOption != TombstoneOption.NONE;
+ }
+
+ // caller must close iterators
+ public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly)
+ {
+ if (!provideTombstoneSources() || !compactingRepaired() || NEVER_PURGE_TOMBSTONES)
+ return null;
+ overlapIterator.update(key);
+ return Iterables.filter(Iterables.transform(overlapIterator.overlaps(),
+ reader -> getShadowIterator(reader, key, tombstoneOnly)),
+ Predicates.notNull());
+ }
+
+ @SuppressWarnings("resource") // caller to close
+ private UnfilteredRowIterator getShadowIterator(SSTableReader reader, DecoratedKey key, boolean tombstoneOnly)
+ {
+ if (reader.isMarkedSuspect() ||
+ reader.getMaxTimestamp() <= minTimestamp ||
+ tombstoneOnly && !reader.hasTombstones())
+ return null;
+ RowIndexEntry<?> position = reader.getPosition(key, SSTableReader.Operator.EQ);
+ if (position == null)
+ return null;
+ FileDataInput dfile = openDataFiles.computeIfAbsent(reader, this::openDataFile);
+ return reader.simpleIterator(dfile, key, position, tombstoneOnly);
+ }
+
+ private FileDataInput openDataFile(SSTableReader reader)
+ {
+ return limiter != null ? reader.openDataReader(limiter) : reader.openDataReader();
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 0111aec..c4edfa6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -17,14 +17,13 @@
*/
package org.apache.cassandra.db.compaction;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.Ordering;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.PurgeFunction;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
@@ -33,6 +32,7 @@
import org.apache.cassandra.index.transactions.CompactionTransaction;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.metrics.CompactionMetrics;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
/**
* Merge multiple iterators over the content of sstable into a "compacted" iterator.
@@ -52,7 +52,6 @@
*/
public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator
{
- private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
private final OperationType type;
@@ -104,6 +103,7 @@
? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
: UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug
+ merged = Transformation.apply(merged, new GarbageSkipper(controller, nowInSec));
this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
}
@@ -313,4 +313,237 @@
return maxPurgeableTimestamp;
}
}
+
+ /**
+ * Unfiltered row iterator that removes deleted data as provided by a "tombstone source" for the partition.
+ * The result produced by this iterator is such that when merged with tombSource it produces the same output
+ * as the merge of dataSource and tombSource.
+ */
+ private static class GarbageSkippingUnfilteredRowIterator extends WrappingUnfilteredRowIterator
+ {
+ final UnfilteredRowIterator tombSource;
+ final DeletionTime partitionLevelDeletion;
+ final Row staticRow;
+ final ColumnFilter cf;
+ final int nowInSec;
+ final CFMetaData metadata;
+ final boolean cellLevelGC;
+
+ DeletionTime tombOpenDeletionTime = DeletionTime.LIVE;
+ DeletionTime dataOpenDeletionTime = DeletionTime.LIVE;
+ DeletionTime openDeletionTime = DeletionTime.LIVE;
+ DeletionTime partitionDeletionTime;
+ DeletionTime activeDeletionTime;
+ Unfiltered tombNext = null;
+ Unfiltered dataNext = null;
+ Unfiltered next = null;
+
+ /**
+ * Construct an iterator that filters out data shadowed by the provided "tombstone source".
+ *
+ * @param dataSource The input row. The result is a filtered version of this.
+ * @param tombSource Tombstone source, i.e. iterator used to identify deleted data in the input row.
+ * @param nowInSec Current time, used in choosing the winner when cell expiration is involved.
+ * @param cellLevelGC If false, the iterator will only look at row-level deletion times and tombstones.
+ * If true, deleted or overwritten cells within a surviving row will also be removed.
+ */
+ protected GarbageSkippingUnfilteredRowIterator(UnfilteredRowIterator dataSource, UnfilteredRowIterator tombSource, int nowInSec, boolean cellLevelGC)
+ {
+ super(dataSource);
+ this.tombSource = tombSource;
+ this.nowInSec = nowInSec;
+ this.cellLevelGC = cellLevelGC;
+ metadata = dataSource.metadata();
+ cf = ColumnFilter.all(metadata);
+
+ activeDeletionTime = partitionDeletionTime = tombSource.partitionLevelDeletion();
+
+ // Only preserve partition level deletion if not shadowed. (Note: Shadowing deletion must not be copied.)
+ this.partitionLevelDeletion = dataSource.partitionLevelDeletion().supersedes(tombSource.partitionLevelDeletion()) ?
+ dataSource.partitionLevelDeletion() :
+ DeletionTime.LIVE;
+
+ Row dataStaticRow = garbageFilterRow(dataSource.staticRow(), tombSource.staticRow());
+ this.staticRow = dataStaticRow != null ? dataStaticRow : Rows.EMPTY_STATIC_ROW;
+
+ tombNext = advance(tombSource);
+ dataNext = advance(dataSource);
+ }
+
+ private static Unfiltered advance(UnfilteredRowIterator source)
+ {
+ return source.hasNext() ? source.next() : null;
+ }
+
+ @Override
+ public DeletionTime partitionLevelDeletion()
+ {
+ return partitionLevelDeletion;
+ }
+
+ public void close()
+ {
+ super.close();
+ tombSource.close();
+ }
+
+ @Override
+ public Row staticRow()
+ {
+ return staticRow;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ // Produce the next element. This may consume multiple elements from both inputs until we find something
+ // from dataSource that is still live. We track the currently open deletion in both sources, as well as the
+ // one we have last issued to the output. The tombOpenDeletionTime is used to filter out content; the others
+ // to decide whether or not a tombstone is superseded, and to be able to surface (the rest of) a deletion
+ // range from the input when a suppressing deletion ends.
+ while (next == null && dataNext != null)
+ {
+ int cmp = tombNext == null ? -1 : metadata.comparator.compare(dataNext, tombNext);
+ if (cmp < 0)
+ {
+ if (dataNext.isRow())
+ next = ((Row) dataNext).filter(cf, activeDeletionTime, false, metadata);
+ else
+ next = processDataMarker();
+ }
+ else if (cmp == 0)
+ {
+ if (dataNext.isRow())
+ {
+ next = garbageFilterRow((Row) dataNext, (Row) tombNext);
+ }
+ else
+ {
+ tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
+ activeDeletionTime = Ordering.natural().max(partitionDeletionTime,
+ tombOpenDeletionTime);
+ next = processDataMarker();
+ }
+ }
+ else // (cmp > 0)
+ {
+ if (tombNext.isRangeTombstoneMarker())
+ {
+ tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
+ activeDeletionTime = Ordering.natural().max(partitionDeletionTime,
+ tombOpenDeletionTime);
+ boolean supersededBefore = openDeletionTime.isLive();
+ boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime);
+ // If a range open was not issued because it was superseded and the deletion isn't superseded any more, we need to open it now.
+ if (supersededBefore && !supersededAfter)
+ next = new RangeTombstoneBoundMarker(((RangeTombstoneMarker) tombNext).closeBound(false).invert(), dataOpenDeletionTime);
+ // If the deletion begins to be superseded, we don't close the range yet. This can save us a close/open pair if it ends after the superseding range.
+ }
+ }
+
+ if (next instanceof RangeTombstoneMarker)
+ openDeletionTime = updateOpenDeletionTime(openDeletionTime, next);
+
+ if (cmp <= 0)
+ dataNext = advance(wrapped);
+ if (cmp >= 0)
+ tombNext = advance(tombSource);
+ }
+ return next != null;
+ }
+
+ protected Row garbageFilterRow(Row dataRow, Row tombRow)
+ {
+ if (cellLevelGC)
+ {
+ return Rows.removeShadowedCells(dataRow, tombRow, activeDeletionTime, nowInSec);
+ }
+ else
+ {
+ DeletionTime deletion = Ordering.natural().max(tombRow.deletion().time(),
+ activeDeletionTime);
+ return dataRow.filter(cf, deletion, false, metadata);
+ }
+ }
+
+ /**
+ * Decide how to act on a tombstone marker from the input iterator. We can decide what to issue depending on
+ * whether or not the ranges before and after the marker are superseded/live -- if none are, we can reuse the
+ * marker; if both are, the marker can be ignored; otherwise we issue a corresponding start/end marker.
+ */
+ private RangeTombstoneMarker processDataMarker()
+ {
+ dataOpenDeletionTime = updateOpenDeletionTime(dataOpenDeletionTime, dataNext);
+ boolean supersededBefore = openDeletionTime.isLive();
+ boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime);
+ RangeTombstoneMarker marker = (RangeTombstoneMarker) dataNext;
+ if (!supersededBefore)
+ if (!supersededAfter)
+ return marker;
+ else
+ return new RangeTombstoneBoundMarker(marker.closeBound(false), marker.closeDeletionTime(false));
+ else
+ if (!supersededAfter)
+ return new RangeTombstoneBoundMarker(marker.openBound(false), marker.openDeletionTime(false));
+ else
+ return null;
+ }
+
+ @Override
+ public Unfiltered next()
+ {
+ if (!hasNext())
+ throw new IllegalStateException();
+
+ Unfiltered v = next;
+ next = null;
+ return v;
+ }
+
+ private DeletionTime updateOpenDeletionTime(DeletionTime openDeletionTime, Unfiltered next)
+ {
+ RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
+ assert openDeletionTime.isLive() == !marker.isClose(false);
+ assert openDeletionTime.isLive() || openDeletionTime.equals(marker.closeDeletionTime(false));
+ return marker.isOpen(false) ? marker.openDeletionTime(false) : DeletionTime.LIVE;
+ }
+ }
+
+ /**
+ * Partition transformation applying GarbageSkippingUnfilteredRowIterator, obtaining tombstone sources for each
+ * partition using the controller's shadowSources method.
+ */
+ private static class GarbageSkipper extends Transformation<UnfilteredRowIterator>
+ {
+ final int nowInSec;
+ final CompactionController controller;
+ final boolean cellLevelGC;
+
+ private GarbageSkipper(CompactionController controller, int nowInSec)
+ {
+ this.controller = controller;
+ this.nowInSec = nowInSec;
+ cellLevelGC = controller.tombstoneOption == TombstoneOption.CELL;
+ }
+
+ @Override
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ Iterable<UnfilteredRowIterator> sources = controller.shadowSources(partition.partitionKey(), !cellLevelGC);
+ if (sources == null)
+ return partition;
+ List<UnfilteredRowIterator> iters = new ArrayList<>();
+ for (UnfilteredRowIterator iter : sources)
+ {
+ if (!iter.isEmpty())
+ iters.add(iter);
+ else
+ iter.close();
+ }
+ if (iters.isEmpty())
+ return partition;
+
+ return new GarbageSkippingUnfilteredRowIterator(partition, UnfilteredRowIterators.merge(iters, nowInSec), nowInSec, cellLevelGC);
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 519ff05..1cfc76b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -63,6 +63,7 @@
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
@@ -442,7 +443,7 @@
public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
{
List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals());
- Collections.sort(sortedSSTables, new SSTableReader.SizeComparator());
+ Collections.sort(sortedSSTables, SSTableReader.sizeComparator);
return sortedSSTables;
}
@@ -455,6 +456,42 @@
}, jobs, OperationType.CLEANUP);
}
+ public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException
+ {
+ assert !cfStore.isIndex();
+
+ return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
+ {
+ @Override
+ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
+ {
+ Iterable<SSTableReader> originals = transaction.originals();
+ if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
+ originals = Iterables.filter(originals, SSTableReader::isRepaired);
+ List<SSTableReader> sortedSSTables = Lists.newArrayList(originals);
+ Collections.sort(sortedSSTables, SSTableReader.maxTimestampComparator);
+ return sortedSSTables;
+ }
+
+ @Override
+ public void execute(LifecycleTransaction txn) throws IOException
+ {
+ logger.debug("Garbage collecting {}", txn.originals());
+ CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()))
+ {
+ @Override
+ protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
+ {
+ return new CompactionController(cfStore, toCompact, gcBefore, getRateLimiter(), tombstoneOption);
+ }
+ };
+ task.setUserDefined(true);
+ task.setCompactionType(OperationType.GARBAGE_COLLECT);
+ task.execute(metrics);
+ }
+ }, jobs, OperationType.GARBAGE_COLLECT);
+ }
+
public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException
{
if (!cfs.getPartitioner().splitter().isPresent())
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index cfe0121..5442a2d 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -138,7 +138,7 @@
if (sstablesWithTombstones.isEmpty())
return Collections.emptyList();
- return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+ return Collections.singletonList(Collections.min(sstablesWithTombstones, SSTableReader.sizeComparator));
}
private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables, long now, int base)
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index ec5e1d9..25c5d20 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -145,7 +145,17 @@
@Override
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
- throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions");
+ if (sstables.size() != 1)
+ throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions");
+
+ LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+ if (transaction == null)
+ {
+ logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
+ return null;
+ }
+ int level = sstables.iterator().next().getSSTableLevel();
+ return getCompactionTask(transaction, gcBefore, level == 0 ? Integer.MAX_VALUE : getMaxSSTableBytes());
}
@Override
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 84a34c9..27b8530 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -38,7 +38,8 @@
WRITE("Write"),
VIEW_BUILD("View build"),
INDEX_SUMMARY("Index summary redistribution"),
- RELOCATE("Relocate sstables to correct disk");
+ RELOCATE("Relocate sstables to correct disk"),
+ GARBAGE_COLLECT("Remove deleted data");
public final String type;
public final String fileName;
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index a9cb211..2cfc75d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -210,7 +210,8 @@
if (indexFile != null && dataStart != dataStartFromIndex)
outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataStartFromIndex));
- try (UnfilteredRowIterator iterator = withValidation(new RowMergingSSTableIterator(sstable, dataFile, key), dataFile.getPath()))
+ try (UnfilteredRowIterator iterator = withValidation(new RowMergingSSTableIterator(SSTableIdentityIterator.create(sstable, dataFile, key)),
+ dataFile.getPath()))
{
if (prevKey != null && prevKey.compareTo(key) > 0)
{
@@ -241,7 +242,7 @@
{
dataFile.seek(dataStartFromIndex);
- try (UnfilteredRowIterator iterator = withValidation(new SSTableIdentityIterator(sstable, dataFile, key), dataFile.getPath()))
+ try (UnfilteredRowIterator iterator = withValidation(SSTableIdentityIterator.create(sstable, dataFile, key), dataFile.getPath()))
{
if (prevKey != null && prevKey.compareTo(key) > 0)
{
@@ -471,38 +472,43 @@
*
* For more details, refer to CASSANDRA-12144.
*/
- private static class RowMergingSSTableIterator extends SSTableIdentityIterator
+ private static class RowMergingSSTableIterator extends WrappingUnfilteredRowIterator
{
- RowMergingSSTableIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
+ Unfiltered nextToOffer = null;
+
+ RowMergingSSTableIterator(UnfilteredRowIterator source)
{
- super(sstable, file, key);
+ super(source);
}
@Override
- protected Unfiltered doCompute()
+ public boolean hasNext()
{
- if (!iterator.hasNext())
- return endOfData();
+ return nextToOffer != null || wrapped.hasNext();
+ }
- Unfiltered next = iterator.next();
- if (!next.isRow())
- return next;
+ @Override
+ public Unfiltered next()
+ {
+ Unfiltered next = nextToOffer != null ? nextToOffer : wrapped.next();
- while (iterator.hasNext())
+ if (next.isRow())
{
- Unfiltered peek = iterator.peek();
- // If there was a duplicate row, merge it.
- if (next.clustering().equals(peek.clustering()) && peek.isRow())
+ while (wrapped.hasNext())
{
- iterator.next(); // Make sure that the peeked item was consumed.
+ Unfiltered peek = wrapped.next();
+ if (!peek.isRow() || !next.clustering().equals(peek.clustering()))
+ {
+ nextToOffer = peek; // Offer peek in next call
+ return next;
+ }
+
+ // Duplicate row, merge it.
next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
}
- else
- {
- break;
- }
}
+ nextToOffer = null;
return next;
}
}
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 7cca4a7..8302a9b 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -101,8 +101,7 @@
if (sstablesWithTombstones.isEmpty())
return Collections.emptyList();
- Collections.sort(sstablesWithTombstones, new SSTableReader.SizeComparator());
- return Collections.singletonList(sstablesWithTombstones.get(0));
+ return Collections.singletonList(Collections.max(sstablesWithTombstones, SSTableReader.sizeComparator));
}
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 55daaa1..fd53930 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -143,7 +143,7 @@
if (sstablesWithTombstones.isEmpty())
return Collections.emptyList();
- return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+ return Collections.singletonList(Collections.min(sstablesWithTombstones, SSTableReader.sizeComparator));
}
private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
@@ -314,7 +314,7 @@
List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
// Trim the largest sstables off the end to meet the maxThreshold
- Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+ Collections.sort(ssTableReaders, SSTableReader.sizeComparator);
return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
}
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 91c7ad7..17b1187 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -187,7 +187,7 @@
markAndThrow();
//mimic the scrub read path
- try (UnfilteredRowIterator iterator = new SSTableIdentityIterator(sstable, dataFile, key))
+ try (UnfilteredRowIterator iterator = SSTableIdentityIterator.create(sstable, dataFile, key))
{
}
diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java
index 54df26e..38bde16 100644
--- a/src/java/org/apache/cassandra/db/rows/Cells.java
+++ b/src/java/org/apache/cassandra/db/rows/Cells.java
@@ -233,6 +233,86 @@
return timeDelta;
}
+ /**
+ * Adds to the builder a representation of the given existing cell that, when merged/reconciled with the given
+ * update cell, produces the same result as merging the original with the update.
+ * <p>
+ * For simple cells that is either the original cell (if still live), or nothing (if shadowed).
+ *
+ * @param existing the pre-existing cell, the one that is updated.
+ * @param update the newly added cell, the update. This can be {@code null} out
+ * of convenience, in which case this function simply copy {@code existing} to
+ * {@code writer}.
+ * @param deletion the deletion time that applies to the cells being considered.
+ * This deletion time may delete both {@code existing} or {@code update}.
+ * @param builder the row builder to which the result of the filtering is written.
+ * @param nowInSec the current time in seconds (which plays a role during reconciliation
+ * because deleted cells always have precedence on timestamp equality and deciding if a
+ * cell is a live or not depends on the current time due to expiring cells).
+ */
+ public static void addNonShadowed(Cell existing,
+ Cell update,
+ DeletionTime deletion,
+ Row.Builder builder,
+ int nowInSec)
+ {
+ if (deletion.deletes(existing))
+ return;
+
+ Cell reconciled = reconcile(existing, update, nowInSec);
+ if (reconciled != update)
+ builder.addCell(existing);
+ }
+
+ /**
+ * Adds to the builder a representation of the given existing cell that, when merged/reconciled with the given
+ * update cell, produces the same result as merging the original with the update.
+ * <p>
+ * For simple cells that is either the original cell (if still live), or nothing (if shadowed).
+ *
+ * @param column the complex column the cells are for.
+ * @param existing the pre-existing cells, the ones that are updated.
+ * @param update the newly added cells, the update. This can be {@code null} out
+ * of convenience, in which case this function simply copy the cells from
+ * {@code existing} to {@code writer}.
+ * @param deletion the deletion time that applies to the cells being considered.
+ * This deletion time may delete both {@code existing} or {@code update}.
+ * @param builder the row builder to which the result of the filtering is written.
+ * @param nowInSec the current time in seconds (which plays a role during reconciliation
+ * because deleted cells always have precedence on timestamp equality and deciding if a
+ * cell is a live or not depends on the current time due to expiring cells).
+ */
+ public static void addNonShadowedComplex(ColumnDefinition column,
+ Iterator<Cell> existing,
+ Iterator<Cell> update,
+ DeletionTime deletion,
+ Row.Builder builder,
+ int nowInSec)
+ {
+ Comparator<CellPath> comparator = column.cellPathComparator();
+ Cell nextExisting = getNext(existing);
+ Cell nextUpdate = getNext(update);
+ while (nextExisting != null)
+ {
+ int cmp = nextUpdate == null ? -1 : comparator.compare(nextExisting.path(), nextUpdate.path());
+ if (cmp < 0)
+ {
+ addNonShadowed(nextExisting, null, deletion, builder, nowInSec);
+ nextExisting = getNext(existing);
+ }
+ else if (cmp == 0)
+ {
+ addNonShadowed(nextExisting, nextUpdate, deletion, builder, nowInSec);
+ nextExisting = getNext(existing);
+ nextUpdate = getNext(update);
+ }
+ else
+ {
+ nextUpdate = getNext(update);
+ }
+ }
+ }
+
private static Cell getNext(Iterator<Cell> iterator)
{
return iterator == null || !iterator.hasNext() ? null : iterator.next();
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index 4f6c8d2..e6d9062 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -25,6 +25,7 @@
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.db.rows.Row.Deletion;
import org.apache.cassandra.utils.MergeIterator;
import org.apache.cassandra.utils.WrappedInt;
@@ -311,4 +312,80 @@
}
return timeDelta;
}
+
+ /**
+ * Returns a row that is obtained from the given existing row by removing everything that is shadowed by data in
+ * the update row. In other words, produces the smallest result row such that
+ * {@code merge(result, update, nowInSec) == merge(existing, update, nowInSec)} after filtering by rangeDeletion.
+ *
+ * @param existing source row
+ * @param update shadowing row
+ * @param rangeDeletion extra {@code DeletionTime} from covering tombstone
+ * @param nowInSec the current time in seconds (which plays a role during reconciliation
+ * because deleted cells always have precedence on timestamp equality and deciding if a
+ * cell is a live or not depends on the current time due to expiring cells).
+ */
+ public static Row removeShadowedCells(Row existing, Row update, DeletionTime rangeDeletion, int nowInSec)
+ {
+ Row.Builder builder = BTreeRow.sortedBuilder();
+ Clustering clustering = existing.clustering();
+ builder.newRow(clustering);
+
+ DeletionTime deletion = update.deletion().time();
+ if (rangeDeletion.supersedes(deletion))
+ deletion = rangeDeletion;
+
+ LivenessInfo existingInfo = existing.primaryKeyLivenessInfo();
+ if (!deletion.deletes(existingInfo))
+ builder.addPrimaryKeyLivenessInfo(existingInfo);
+ Row.Deletion rowDeletion = existing.deletion();
+ if (!deletion.supersedes(rowDeletion.time()))
+ builder.addRowDeletion(rowDeletion);
+
+ Iterator<ColumnData> a = existing.iterator();
+ Iterator<ColumnData> b = update.iterator();
+ ColumnData nexta = a.hasNext() ? a.next() : null, nextb = b.hasNext() ? b.next() : null;
+ while (nexta != null)
+ {
+ int comparison = nextb == null ? -1 : nexta.column.compareTo(nextb.column);
+ if (comparison <= 0)
+ {
+ ColumnData cura = nexta;
+ ColumnDefinition column = cura.column;
+ ColumnData curb = comparison == 0 ? nextb : null;
+ if (column.isSimple())
+ {
+ Cells.addNonShadowed((Cell) cura, (Cell) curb, deletion, builder, nowInSec);
+ }
+ else
+ {
+ ComplexColumnData existingData = (ComplexColumnData) cura;
+ ComplexColumnData updateData = (ComplexColumnData) curb;
+
+ DeletionTime existingDt = existingData.complexDeletion();
+ DeletionTime updateDt = updateData == null ? DeletionTime.LIVE : updateData.complexDeletion();
+
+ DeletionTime maxDt = updateDt.supersedes(deletion) ? updateDt : deletion;
+ if (existingDt.supersedes(maxDt))
+ {
+ builder.addComplexDeletion(column, existingDt);
+ maxDt = existingDt;
+ }
+
+ Iterator<Cell> existingCells = existingData.iterator();
+ Iterator<Cell> updateCells = updateData == null ? null : updateData.iterator();
+ Cells.addNonShadowedComplex(column, existingCells, updateCells, maxDt, builder, nowInSec);
+ }
+ nexta = a.hasNext() ? a.next() : null;
+ if (curb != null)
+ nextb = b.hasNext() ? b.next() : null;
+ }
+ else
+ {
+ nextb = b.hasNext() ? b.next() : null;
+ }
+ }
+ Row row = builder.build();
+ return row != null && !row.isEmpty() ? row : null;
+ }
}
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index ed6bd12..db18859 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -24,9 +24,11 @@
import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Row.Deletion;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.WrappedException;
@@ -450,6 +452,58 @@
}
}
+ public Unfiltered deserializeTombstonesOnly(FileDataInput in, SerializationHeader header, SerializationHelper helper)
+ throws IOException
+ {
+ while (true)
+ {
+ int flags = in.readUnsignedByte();
+ if (isEndOfPartition(flags))
+ return null;
+
+ int extendedFlags = readExtendedFlags(in, flags);
+
+ if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+ {
+ ClusteringBoundOrBoundary bound = ClusteringBoundOrBoundary.serializer.deserialize(in, helper.version, header.clusteringTypes());
+ return deserializeMarkerBody(in, header, bound);
+ }
+ else
+ {
+ assert !isStatic(extendedFlags); // deserializeStaticRow should be used for that.
+ if ((flags & HAS_DELETION) != 0)
+ {
+ assert header.isForSSTable();
+ boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
+ boolean hasTTL = (flags & HAS_TTL) != 0;
+ boolean deletionIsShadowable = (extendedFlags & HAS_SHADOWABLE_DELETION) != 0;
+ Clustering clustering = Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes());
+ long nextPosition = in.readUnsignedVInt() + in.getFilePointer();
+ in.readUnsignedVInt(); // skip previous unfiltered size
+ if (hasTimestamp)
+ {
+ header.readTimestamp(in);
+ if (hasTTL)
+ {
+ header.readTTL(in);
+ header.readLocalDeletionTime(in);
+ }
+ }
+
+ Deletion deletion = new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable);
+ in.seek(nextPosition);
+ return BTreeRow.emptyDeletedRow(clustering, deletion);
+ }
+ else
+ {
+ Clustering.serializer.skip(in, helper.version, header.clusteringTypes());
+ skipRowBody(in);
+ // Continue with next item.
+ }
+ }
+ }
+ }
+
public Row deserializeStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper)
throws IOException
{
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index de8d69b..1173d40 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -96,7 +96,7 @@
dataFile.seek(indexEntry.position);
ByteBufferUtil.readWithShortLength(dataFile); // key
- try (SSTableIdentityIterator partition = new SSTableIdentityIterator(sstable, dataFile, key))
+ try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key))
{
// if the row has statics attached, it has to be indexed separately
indexWriter.nextUnfilteredCluster(partition.staticRow());
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index a5af334..2a79f88 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -19,15 +19,15 @@
import java.io.*;
-import org.apache.cassandra.utils.AbstractIterator;
-
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
-public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implements Comparable<SSTableIdentityIterator>, UnfilteredRowIterator
+public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, UnfilteredRowIterator
{
private final SSTableReader sstable;
private final DecoratedKey key;
@@ -37,29 +37,51 @@
protected final SSTableSimpleIterator iterator;
private final Row staticRow;
- /**
- * Used to iterate through the columns of a row.
- * @param sstable SSTable we are reading ffrom.
- * @param file Reading using this file.
- * @param key Key of this row.
- */
- public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
+ public SSTableIdentityIterator(SSTableReader sstable, DecoratedKey key, DeletionTime partitionLevelDeletion,
+ String filename, SSTableSimpleIterator iterator) throws IOException
{
+ super();
this.sstable = sstable;
- this.filename = file.getPath();
this.key = key;
+ this.partitionLevelDeletion = partitionLevelDeletion;
+ this.filename = filename;
+ this.iterator = iterator;
+ this.staticRow = iterator.readStaticRow();
+ }
+ public static SSTableIdentityIterator create(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
+ {
try
{
- this.partitionLevelDeletion = DeletionTime.serializer.deserialize(file);
+ DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(file);
SerializationHelper helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL);
- this.iterator = SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion);
- this.staticRow = iterator.readStaticRow();
+ SSTableSimpleIterator iterator = SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion);
+ return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, file.getPath(), iterator);
}
catch (IOException e)
{
sstable.markSuspect();
- throw new CorruptSSTableException(e, filename);
+ throw new CorruptSSTableException(e, file.getPath());
+ }
+ }
+
+ public static SSTableIdentityIterator create(SSTableReader sstable, FileDataInput dfile, RowIndexEntry<?> indexEntry, DecoratedKey key, boolean tombstoneOnly)
+ {
+ try
+ {
+ dfile.seek(indexEntry.position);
+ ByteBufferUtil.skipShortLength(dfile); // Skip partition key
+ DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(dfile);
+ SerializationHelper helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ SSTableSimpleIterator iterator = tombstoneOnly
+ ? SSTableSimpleIterator.createTombstoneOnly(sstable.metadata, dfile, sstable.header, helper, partitionLevelDeletion)
+ : SSTableSimpleIterator.create(sstable.metadata, dfile, sstable.header, helper, partitionLevelDeletion);
+ return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, dfile.getPath(), iterator);
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, dfile.getPath());
}
}
@@ -93,7 +115,32 @@
return staticRow;
}
- protected Unfiltered computeNext()
+ public boolean hasNext()
+ {
+ try
+ {
+ return iterator.hasNext();
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, filename);
+ }
+ catch (IOError e)
+ {
+ if (e.getCause() instanceof IOException)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException((Exception)e.getCause(), filename);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+
+ public Unfiltered next()
{
try
{
@@ -120,7 +167,7 @@
protected Unfiltered doCompute()
{
- return iterator.hasNext() ? iterator.next() : endOfData();
+ return iterator.next();
}
public void close()
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 2d4314e..ce42126 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -30,6 +30,7 @@
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.net.MessagingService;
/**
@@ -59,6 +60,14 @@
return new CurrentFormatIterator(metadata, in, header, helper);
}
+ public static SSTableSimpleIterator createTombstoneOnly(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion)
+ {
+ if (helper.version < MessagingService.VERSION_30)
+ return new OldFormatTombstoneIterator(metadata, in, helper, partitionDeletion);
+ else
+ return new CurrentFormatTombstoneIterator(metadata, in, header, helper);
+ }
+
public abstract Row readStaticRow() throws IOException;
private static class CurrentFormatIterator extends SSTableSimpleIterator
@@ -93,6 +102,41 @@
}
}
+ private static class CurrentFormatTombstoneIterator extends SSTableSimpleIterator
+ {
+ private final SerializationHeader header;
+
+ private CurrentFormatTombstoneIterator(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper)
+ {
+ super(metadata, in, helper);
+ this.header = header;
+ }
+
+ public Row readStaticRow() throws IOException
+ {
+ if (header.hasStatic())
+ {
+ Row staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, helper);
+ if (!staticRow.deletion().isLive())
+ return BTreeRow.emptyDeletedRow(staticRow.clustering(), staticRow.deletion());
+ }
+ return Rows.EMPTY_STATIC_ROW;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ try
+ {
+ Unfiltered unfiltered = UnfilteredSerializer.serializer.deserializeTombstonesOnly((FileDataInput) in, header, helper);
+ return unfiltered == null ? endOfData() : unfiltered;
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ }
+
private static class OldFormatIterator extends SSTableSimpleIterator
{
private final UnfilteredDeserializer deserializer;
@@ -163,4 +207,35 @@
}
+ private static class OldFormatTombstoneIterator extends OldFormatIterator
+ {
+ private OldFormatTombstoneIterator(CFMetaData metadata, DataInputPlus in, SerializationHelper helper, DeletionTime partitionDeletion)
+ {
+ super(metadata, in, helper, partitionDeletion);
+ }
+
+ public Row readStaticRow() throws IOException
+ {
+ Row row = super.readStaticRow();
+ if (!row.deletion().isLive())
+ return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion());
+ return Rows.EMPTY_STATIC_ROW;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ while (true)
+ {
+ Unfiltered unfiltered = super.computeNext();
+ if (unfiltered == null || unfiltered.isRangeTombstoneMarker())
+ return unfiltered;
+
+ Row row = (Row) unfiltered;
+ if (!row.deletion().isLive())
+ return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion());
+ // Otherwise read next.
+ }
+ }
+
+ }
}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index d26edfa..32d3156 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -167,6 +167,14 @@
public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
+ public static final Comparator<SSTableReader> sizeComparator = new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
+ }
+ };
+
/**
* maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
* to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
@@ -1529,6 +1537,8 @@
public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
+ public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly);
+
/**
* Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
*/
@@ -2016,14 +2026,6 @@
return 0;
}
- public static class SizeComparator implements Comparator<SSTableReader>
- {
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
- }
- }
-
public EncodingStats stats()
{
// We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 7a7ce8c..8c64b01 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -30,13 +30,11 @@
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
@@ -120,6 +118,13 @@
}
+ @SuppressWarnings("resource") // caller to close
+ @Override
+ public UnfilteredRowIterator simpleIterator(FileDataInput dfile, DecoratedKey key, RowIndexEntry position, boolean tombstoneOnly)
+ {
+ return SSTableIdentityIterator.create(this, dfile, position, key, tombstoneOnly);
+ }
+
/**
* @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
* allow key selection by token bounds but only if op != * EQ
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 6d31844..66213a6 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -331,7 +331,7 @@
{
dfile.seek(currentEntry.position);
ByteBufferUtil.skipShortLength(dfile); // key
- return new SSTableIdentityIterator(sstable, dfile, partitionKey());
+ return SSTableIdentityIterator.create(sstable, dfile, partitionKey());
}
ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey());
diff --git a/src/java/org/apache/cassandra/schema/CompactionParams.java b/src/java/org/apache/cassandra/schema/CompactionParams.java
index 720efa3..73271f1 100644
--- a/src/java/org/apache/cassandra/schema/CompactionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompactionParams.java
@@ -45,7 +45,8 @@
CLASS,
ENABLED,
MIN_THRESHOLD,
- MAX_THRESHOLD;
+ MAX_THRESHOLD,
+ PROVIDE_OVERLAPPING_TOMBSTONES;
@Override
public String toString()
@@ -54,27 +55,38 @@
}
}
+ public enum TombstoneOption
+ {
+ NONE,
+ ROW,
+ CELL;
+ }
+
public static final int DEFAULT_MIN_THRESHOLD = 4;
public static final int DEFAULT_MAX_THRESHOLD = 32;
public static final boolean DEFAULT_ENABLED = true;
+ public static final TombstoneOption DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES =
+ TombstoneOption.valueOf(System.getProperty("default.provide.overlapping.tombstones", TombstoneOption.NONE.toString()).toUpperCase());
public static final Map<String, String> DEFAULT_THRESHOLDS =
ImmutableMap.of(Option.MIN_THRESHOLD.toString(), Integer.toString(DEFAULT_MIN_THRESHOLD),
Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD));
public static final CompactionParams DEFAULT =
- new CompactionParams(SizeTieredCompactionStrategy.class, DEFAULT_THRESHOLDS, DEFAULT_ENABLED);
+ new CompactionParams(SizeTieredCompactionStrategy.class, DEFAULT_THRESHOLDS, DEFAULT_ENABLED, DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES);
private final Class<? extends AbstractCompactionStrategy> klass;
private final ImmutableMap<String, String> options;
private final boolean isEnabled;
+ private final TombstoneOption tombstoneOption;
- private CompactionParams(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options, boolean isEnabled)
+ private CompactionParams(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options, boolean isEnabled, TombstoneOption tombstoneOption)
{
this.klass = klass;
this.options = ImmutableMap.copyOf(options);
this.isEnabled = isEnabled;
+ this.tombstoneOption = tombstoneOption;
}
public static CompactionParams create(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options)
@@ -82,6 +94,8 @@
boolean isEnabled = options.containsKey(Option.ENABLED.toString())
? Boolean.parseBoolean(options.get(Option.ENABLED.toString()))
: DEFAULT_ENABLED;
+ TombstoneOption tombstoneOption = TombstoneOption.valueOf(options.getOrDefault(Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString(),
+ DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES.toString()).toUpperCase());
Map<String, String> allOptions = new HashMap<>(options);
if (supportsThresholdParams(klass))
@@ -90,7 +104,7 @@
allOptions.putIfAbsent(Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD));
}
- return new CompactionParams(klass, allOptions, isEnabled);
+ return new CompactionParams(klass, allOptions, isEnabled, tombstoneOption);
}
public static CompactionParams scts(Map<String, String> options)
@@ -119,6 +133,11 @@
: Integer.parseInt(threshold);
}
+ public TombstoneOption tombstoneOption()
+ {
+ return tombstoneOption;
+ }
+
public void validate()
{
try
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d64fc04..0a35296 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -75,6 +75,7 @@
import org.apache.cassandra.net.*;
import org.apache.cassandra.repair.*;
import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.service.paxos.CommitVerbHandler;
import org.apache.cassandra.service.paxos.PrepareVerbHandler;
@@ -2809,6 +2810,19 @@
return status.statusCode;
}
+ public int garbageCollect(String tombstoneOptionString, int jobs, String keyspaceName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ TombstoneOption tombstoneOption = TombstoneOption.valueOf(tombstoneOptionString);
+ CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
+ for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
+ {
+ CompactionManager.AllSSTableOpStatus oneStatus = cfs.garbageCollect(tombstoneOption, jobs);
+ if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+ status = oneStatus;
+ }
+ return status.statusCode;
+ }
+
/**
* Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified.
*
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index f7da817..abb10c1 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -290,6 +290,12 @@
public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException;
/**
+ * Rewrites all sstables from the given tables to remove deleted data.
+ * The tombstone option defines the granularity of the procedure: ROW removes deleted partitions and rows, CELL also removes overwritten or deleted cells.
+ */
+ public int garbageCollect(String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+
+ /**
* Flush all memtables for the given column families, or all columnfamilies for the given keyspace
* if none are explicitly listed.
* @param keyspaceName
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 84eeb04..c33dfa4 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -73,6 +73,7 @@
import org.apache.cassandra.metrics.ThreadPoolMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.CacheServiceMBean;
import org.apache.cassandra.service.GCInspector;
@@ -259,6 +260,11 @@
return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames);
}
+ public int garbageCollect(String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ {
+ return ssProxy.garbageCollect(tombstoneOption, jobs, keyspaceName, tableNames);
+ }
+
private void checkJobs(PrintStream out, int jobs)
{
if (jobs > DatabaseDescriptor.getConcurrentCompactors())
@@ -301,7 +307,16 @@
if (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames) != 0)
{
failed = true;
- out.println("Aborted upgrading sstables for atleast one table in keyspace "+keyspaceName+", check server logs for more information.");
+ out.println("Aborted upgrading sstables for at least one table in keyspace " + keyspaceName + ", check server logs for more information.");
+ }
+ }
+
+ public void garbageCollect(PrintStream out, String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ {
+ if (garbageCollect(tombstoneOption, jobs, keyspaceName, tableNames) != 0)
+ {
+ failed = true;
+ out.println("Aborted garbage collection for at least one table in keyspace " + keyspaceName + ", check server logs for more information.");
}
}
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 8640b58..cde4ee5 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -64,6 +64,7 @@
Verify.class,
Flush.class,
UpgradeSSTable.class,
+ GarbageCollect.class,
DisableAutoCompaction.class,
EnableAutoCompaction.class,
CompactionStats.class,
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
new file mode 100644
index 0000000..37daf09
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "garbagecollect", description = "Remove deleted data from one or more tables")
+public class GarbageCollect extends NodeToolCmd
+{
+ @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
+ private List<String> args = new ArrayList<>();
+
+ @Option(title = "granularity",
+ name = {"-g", "--granularity"},
+ allowedValues = {"ROW", "CELL"},
+ description = "Granularity of garbage removal. ROW (default) removes deleted partitions and rows, CELL also removes overwritten or deleted cells.")
+ private String tombstoneOption = "ROW";
+
+ @Option(title = "jobs",
+ name = {"-j", "--jobs"},
+ description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction threads")
+ private int jobs = 2;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] tableNames = parseOptionalTables(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.garbageCollect(System.out, tombstoneOption, jobs, keyspace, tableNames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during garbage collection", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/long/org/apache/cassandra/cql3/GcCompactionBench.java b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
new file mode 100644
index 0000000..ca39b55
--- /dev/null
+++ b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Iterables;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class GcCompactionBench extends CQLTester
+{
+ private static final String SIZE_TIERED_STRATEGY = "SizeTieredCompactionStrategy', 'min_sstable_size' : '0";
+ private static final String LEVELED_STRATEGY = "LeveledCompactionStrategy', 'sstable_size_in_mb' : '16";
+
+ private static final int DEL_SECTIONS = 1000;
+ private static final int FLUSH_FREQ = 10000;
+ private static final int RANGE_FREQUENCY_INV = 16;
+ static final int COUNT = 90000;
+ static final int ITERS = 9;
+
+ static final int KEY_RANGE = 10;
+ static final int CLUSTERING_RANGE = 210000;
+
+ static final int EXTRA_SIZE = 1025;
+
+ // The name of this method is important!
+ // CommitLog settings must be applied before CQLTester sets up; by using the same name as its @BeforeClass method we
+ // are effectively overriding it.
+ @BeforeClass
+ public static void setUpClass() // overrides CQLTester.setUpClass()
+ {
+ DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(100);
+ CQLTester.setUpClass();
+ }
+
+ String hashQuery;
+
+ @Before
+ public void before() throws Throwable
+ {
+ createTable("CREATE TABLE %s(" +
+ " key int," +
+ " column int," +
+ " data int," +
+ " extra text," +
+ " PRIMARY KEY(key, column)" +
+ ")"
+ );
+
+ String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, int",
+ " CREATE FUNCTION %s (state int, val int)" +
+ " CALLED ON NULL INPUT" +
+ " RETURNS int" +
+ " LANGUAGE java" +
+ " AS 'return val != null ? state * 17 + val : state;'")).name;
+ String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, text",
+ " CREATE FUNCTION %s (state int, val text)" +
+ " CALLED ON NULL INPUT" +
+ " RETURNS int" +
+ " LANGUAGE java" +
+ " AS 'return val != null ? state * 17 + val.hashCode() : state;'")).name;
+
+ String hashInt = createAggregate(KEYSPACE, "int",
+ " CREATE AGGREGATE %s (int)" +
+ " SFUNC " + hashIFunc +
+ " STYPE int" +
+ " INITCOND 1");
+ String hashText = createAggregate(KEYSPACE, "text",
+ " CREATE AGGREGATE %s (text)" +
+ " SFUNC " + hashTFunc +
+ " STYPE int" +
+ " INITCOND 1");
+
+ hashQuery = String.format("SELECT count(column), %s(key), %s(column), %s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s",
+ hashInt, hashInt, hashInt, hashText);
+ }
+ AtomicLong id = new AtomicLong();
+ long compactionTimeNanos = 0;
+
+ void pushData(Random rand, int count) throws Throwable
+ {
+ for (int i = 0; i < count; ++i)
+ {
+ long ii = id.incrementAndGet();
+ if (ii % 1000 == 0)
+ System.out.print('.');
+ int key = rand.nextInt(KEY_RANGE);
+ int column = rand.nextInt(CLUSTERING_RANGE);
+ execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", key, column, (int) ii, genExtra(rand));
+ maybeCompact(ii);
+ }
+ }
+
+ private String genExtra(Random rand)
+ {
+ StringBuilder builder = new StringBuilder(EXTRA_SIZE);
+ for (int i = 0; i < EXTRA_SIZE; ++i)
+ builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1)));
+ return builder.toString();
+ }
+
+ void deleteData(Random rand, int count) throws Throwable
+ {
+ for (int i = 0; i < count; ++i)
+ {
+ int key;
+ UntypedResultSet res;
+ long ii = id.incrementAndGet();
+ if (ii % 1000 == 0)
+ System.out.print('-');
+ if (rand.nextInt(RANGE_FREQUENCY_INV) != 1)
+ {
+ do
+ {
+ key = rand.nextInt(KEY_RANGE);
+ long cid = rand.nextInt(DEL_SECTIONS);
+ int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
+ int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS);
+ res = execute("SELECT column FROM %s WHERE key = ? AND column >= ? AND column < ? LIMIT 1", key, cstart, cend);
+ } while (res.size() == 0);
+ UntypedResultSet.Row r = Iterables.get(res, rand.nextInt(res.size()));
+ int clustering = r.getInt("column");
+ execute("DELETE FROM %s WHERE key = ? AND column = ?", key, clustering);
+ }
+ else
+ {
+ key = rand.nextInt(KEY_RANGE);
+ long cid = rand.nextInt(DEL_SECTIONS);
+ int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
+ int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS);
+ res = execute("DELETE FROM %s WHERE key = ? AND column >= ? AND column < ?", key, cstart, cend);
+ }
+ maybeCompact(ii);
+ }
+ }
+
+ private void maybeCompact(long ii)
+ {
+ if (ii % FLUSH_FREQ == 0)
+ {
+ System.out.print("F");
+ flush();
+ if (ii % (FLUSH_FREQ * 10) == 0)
+ {
+ System.out.println("C");
+ long startTime = System.nanoTime();
+ getCurrentColumnFamilyStore().enableAutoCompaction(true);
+ long endTime = System.nanoTime();
+ compactionTimeNanos += endTime - startTime;
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ }
+ }
+ }
+
+ public void testGcCompaction(TombstoneOption tombstoneOption, TombstoneOption backgroundTombstoneOption, String compactionClass) throws Throwable
+ {
+ id.set(0);
+ compactionTimeNanos = 0;
+ alterTable("ALTER TABLE %s WITH compaction = { 'class' : '" + compactionClass + "', 'provide_overlapping_tombstones' : '" + backgroundTombstoneOption + "' };");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ cfs.disableAutoCompaction();
+
+ long onStartTime = System.currentTimeMillis();
+ ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ List<Future<?>> tasks = new ArrayList<>();
+ for (int ti = 0; ti < 1; ++ti)
+ {
+ Random rand = new Random(ti);
+ tasks.add(es.submit(() ->
+ {
+ for (int i = 0; i < ITERS; ++i)
+ try
+ {
+ pushData(rand, COUNT);
+ deleteData(rand, COUNT / 3);
+ }
+ catch (Throwable e)
+ {
+ throw new AssertionError(e);
+ }
+ }));
+ }
+ for (Future<?> task : tasks)
+ task.get();
+
+ flush();
+ long onEndTime = System.currentTimeMillis();
+ int startRowCount = countRows(cfs);
+ int startTombCount = countTombstoneMarkers(cfs);
+ int startRowDeletions = countRowDeletions(cfs);
+ int startTableCount = cfs.getLiveSSTables().size();
+ long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
+ System.out.println();
+
+ String hashesBefore = getHashes();
+
+ long startTime = System.currentTimeMillis();
+ CompactionManager.instance.performGarbageCollection(cfs, tombstoneOption, 0);
+ long endTime = System.currentTimeMillis();
+
+ int endRowCount = countRows(cfs);
+ int endTombCount = countTombstoneMarkers(cfs);
+ int endRowDeletions = countRowDeletions(cfs);
+ int endTableCount = cfs.getLiveSSTables().size();
+ long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
+
+ System.out.println(cfs.getCompactionParametersJson());
+ System.out.println(String.format("%s compactions completed in %.3fs",
+ tombstoneOption.toString(), (endTime - startTime) * 1e-3));
+ System.out.println(String.format("Operations completed in %.3fs, out of which %.3f for ongoing " + backgroundTombstoneOption + " background compactions",
+ (onEndTime - onStartTime) * 1e-3, compactionTimeNanos * 1e-9));
+ System.out.println(String.format("At start: %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers",
+ startTableCount, startSize, startRowCount, startRowDeletions, startTombCount));
+ System.out.println(String.format("At end: %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers",
+ endTableCount, endSize, endRowCount, endRowDeletions, endTombCount));
+
+ String hashesAfter = getHashes();
+ Assert.assertEquals(hashesBefore, hashesAfter);
+ }
+
+ private String getHashes() throws Throwable
+ {
+ long startTime = System.currentTimeMillis();
+ String hashes = Arrays.toString(getRows(execute(hashQuery))[0]);
+ long endTime = System.currentTimeMillis();
+ System.out.println(String.format("Hashes: %s, retrieved in %.3fs", hashes, (endTime - startTime) * 1e-3));
+ return hashes;
+ }
+
+ @Test
+ public void testCellAtEnd() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY);
+ }
+
+ @Test
+ public void testRowAtEnd() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY);
+ }
+
+ @Test
+ public void testCellThroughout() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, LEVELED_STRATEGY);
+ }
+
+ @Test
+ public void testRowThroughout() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, LEVELED_STRATEGY);
+ }
+
+ @Test
+ public void testCopyCompaction() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, LEVELED_STRATEGY);
+ }
+
+ @Test
+ public void testCellAtEndSizeTiered() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
+ }
+
+ @Test
+ public void testRowAtEndSizeTiered() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.ROW, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
+ }
+
+ @Test
+ public void testCellThroughoutSizeTiered() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, SIZE_TIERED_STRATEGY);
+ }
+
+ @Test
+ public void testRowThroughoutSizeTiered() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, SIZE_TIERED_STRATEGY);
+ }
+
+ @Test
+ public void testCopyCompactionSizeTiered() throws Throwable
+ {
+ testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
+ }
+
+ int countTombstoneMarkers(ColumnFamilyStore cfs)
+ {
+ return count(cfs, x -> x.isRangeTombstoneMarker());
+ }
+
+ int countRowDeletions(ColumnFamilyStore cfs)
+ {
+ return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive());
+ }
+
+ int countRows(ColumnFamilyStore cfs)
+ {
+ int nowInSec = FBUtilities.nowInSeconds();
+ return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec));
+ }
+
+ private int count(ColumnFamilyStore cfs, Predicate<Unfiltered> predicate)
+ {
+ int count = 0;
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ count += count(reader, predicate);
+ return count;
+ }
+
+ int count(SSTableReader reader, Predicate<Unfiltered> predicate)
+ {
+ int instances = 0;
+ try (ISSTableScanner partitions = reader.getScanner())
+ {
+ while (partitions.hasNext())
+ {
+ try (UnfilteredRowIterator iter = partitions.next())
+ {
+ while (iter.hasNext())
+ {
+ Unfiltered atom = iter.next();
+ if (predicate.test(atom))
+ ++instances;
+ }
+ }
+ }
+ }
+ return instances;
+ }
+}
diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
new file mode 100644
index 0000000..6fed033
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class GcCompactionTest extends CQLTester
+{
+ static final int KEY_COUNT = 10;
+ static final int CLUSTERING_COUNT = 20;
+
+ @Test
+ public void testGcCompactionPartitions() throws Throwable
+ {
+ runCompactionTest("CREATE TABLE %s(" +
+ " key int," +
+ " column int," +
+ " data int," +
+ " extra text," +
+ " PRIMARY KEY((key, column), data)" +
+ ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row' };"
+ );
+
+ }
+
+ @Test
+ public void testGcCompactionRows() throws Throwable
+ {
+ runCompactionTest("CREATE TABLE %s(" +
+ " key int," +
+ " column int," +
+ " data int," +
+ " extra text," +
+ " PRIMARY KEY(key, column)" +
+ ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row' };"
+ );
+
+ }
+
+ @Test
+ public void testGcCompactionRanges() throws Throwable
+ {
+
+ runCompactionTest("CREATE TABLE %s(" +
+ " key int," +
+ " column int," +
+ " col2 int," +
+ " data int," +
+ " extra text," +
+ " PRIMARY KEY(key, column, data)" +
+ ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row' };"
+ );
+ }
+
+ private void runCompactionTest(String tableDef) throws Throwable
+ {
+ createTable(tableDef);
+
+ for (int i = 0; i < KEY_COUNT; ++i)
+ for (int j = 0; j < CLUSTERING_COUNT; ++j)
+ execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+ Set<SSTableReader> readers = new HashSet<>();
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ flush();
+ assertEquals(1, cfs.getLiveSSTables().size());
+ SSTableReader table0 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table0));
+ int rowCount = countRows(table0);
+
+ deleteWithSomeInserts(3, 5, 10);
+ flush();
+ assertEquals(2, cfs.getLiveSSTables().size());
+ SSTableReader table1 = getNewTable(readers);
+ assertTrue(countRows(table1) > 0);
+ assertTrue(countTombstoneMarkers(table1) > 0);
+
+ deleteWithSomeInserts(5, 6, 0);
+ flush();
+ assertEquals(3, cfs.getLiveSSTables().size());
+ SSTableReader table2 = getNewTable(readers);
+ assertEquals(0, countRows(table2));
+ assertTrue(countTombstoneMarkers(table2) > 0);
+
+ CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+ assertEquals(3, cfs.getLiveSSTables().size());
+ SSTableReader table3 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table3));
+ assertTrue(rowCount > countRows(table3));
+ }
+
+ @Test
+ public void testGcCompactionCells() throws Throwable
+ {
+ createTable("CREATE TABLE %s(" +
+ " key int," +
+ " column int," +
+ " data int," +
+ " extra text," +
+ " PRIMARY KEY(key)" +
+ ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell' };"
+ );
+
+ for (int i = 0; i < KEY_COUNT; ++i)
+ for (int j = 0; j < CLUSTERING_COUNT; ++j)
+ execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+ Set<SSTableReader> readers = new HashSet<>();
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ flush();
+ assertEquals(1, cfs.getLiveSSTables().size());
+ SSTableReader table0 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table0));
+ int cellCount = countCells(table0);
+
+ deleteWithSomeInserts(3, 0, 2);
+ flush();
+ assertEquals(2, cfs.getLiveSSTables().size());
+ SSTableReader table1 = getNewTable(readers);
+ assertTrue(countCells(table1) > 0);
+ assertEquals(0, countTombstoneMarkers(table0));
+
+ CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+ assertEquals(2, cfs.getLiveSSTables().size());
+ SSTableReader table3 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table3));
+ assertTrue(cellCount > countCells(table3));
+ }
+
+ @Test
+ public void testGcCompactionStatic() throws Throwable
+ {
+ createTable("CREATE TABLE %s(" +
+ " key int," +
+ " column int," +
+ " data int static," +
+ " extra text," +
+ " PRIMARY KEY(key, column)" +
+ ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell' };"
+ );
+
+ for (int i = 0; i < KEY_COUNT; ++i)
+ for (int j = 0; j < CLUSTERING_COUNT; ++j)
+ execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+ Set<SSTableReader> readers = new HashSet<>();
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ flush();
+ assertEquals(1, cfs.getLiveSSTables().size());
+ SSTableReader table0 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table0));
+ int cellCount = countStaticCells(table0);
+ assertEquals(KEY_COUNT, cellCount);
+
+ execute("DELETE data FROM %s WHERE key = 0"); // delete static cell
+ execute("INSERT INTO %s (key, data) VALUES (1, 0)"); // overwrite static cell
+ flush();
+ assertEquals(2, cfs.getLiveSSTables().size());
+ SSTableReader table1 = getNewTable(readers);
+ assertTrue(countStaticCells(table1) > 0);
+ assertEquals(0, countTombstoneMarkers(table0));
+
+ CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+ assertEquals(2, cfs.getLiveSSTables().size());
+ SSTableReader table3 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table3));
+ assertEquals(cellCount - 2, countStaticCells(table3));
+ }
+
+ @Test
+ public void testGcCompactionComplexColumn() throws Throwable
+ {
+ createTable("CREATE TABLE %s(" +
+ " key int," +
+ " data map<int, int>," +
+ " extra text," +
+ " PRIMARY KEY(key)" +
+ ") WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell' };"
+ );
+
+ for (int i = 0; i < KEY_COUNT; ++i)
+ for (int j = 0; j < CLUSTERING_COUNT; ++j)
+ execute("UPDATE %s SET data[?] = ? WHERE key = ?", j, i+j, i);
+
+ Set<SSTableReader> readers = new HashSet<>();
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ flush();
+ assertEquals(1, cfs.getLiveSSTables().size());
+ SSTableReader table0 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table0));
+ int cellCount = countComplexCells(table0);
+
+ deleteWithSomeInsertsComplexColumn(3, 5, 8);
+ flush();
+ assertEquals(2, cfs.getLiveSSTables().size());
+ SSTableReader table1 = getNewTable(readers);
+ assertTrue(countComplexCells(table1) > 0);
+ assertEquals(0, countTombstoneMarkers(table0));
+
+ CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+ assertEquals(2, cfs.getLiveSSTables().size());
+ SSTableReader table3 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table3));
+ assertEquals(cellCount - 23, countComplexCells(table3));
+ }
+
+ @Test
+ public void testLocalDeletionTime() throws Throwable
+ {
+ createTable("create table %s (k int, c1 int, primary key (k, c1)) with compaction = {'class': 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones':'row'}");
+ execute("delete from %s where k = 1");
+ Set<SSTableReader> readers = new HashSet<>(getCurrentColumnFamilyStore().getLiveSSTables());
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ SSTableReader oldSSTable = getNewTable(readers);
+ Thread.sleep(2000);
+ execute("delete from %s where k = 1");
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ SSTableReader newTable = getNewTable(readers);
+
+ CompactionManager.instance.forceUserDefinedCompaction(oldSSTable.getFilename());
+
+ // Old table now doesn't contain any data and should disappear.
+ assertEquals(Collections.singleton(newTable), getCurrentColumnFamilyStore().getLiveSSTables());
+ }
+
+ private SSTableReader getNewTable(Set<SSTableReader> readers)
+ {
+ Set<SSTableReader> newOnes = new HashSet<>(getCurrentColumnFamilyStore().getLiveSSTables());
+ newOnes.removeAll(readers);
+ assertEquals(1, newOnes.size());
+ readers.addAll(newOnes);
+ return Iterables.get(newOnes, 0);
+ }
+
+ void deleteWithSomeInserts(int key_step, int delete_step, int readd_step) throws Throwable
+ {
+ for (int i = 0; i < KEY_COUNT; i += key_step)
+ {
+ if (delete_step > 0)
+ for (int j = i % delete_step; j < CLUSTERING_COUNT; j += delete_step)
+ {
+ execute("DELETE FROM %s WHERE key = ? AND column = ?", i, j);
+ }
+ if (readd_step > 0)
+ for (int j = i % readd_step; j < CLUSTERING_COUNT; j += readd_step)
+ {
+ execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i-j, "readded " + i + ":" + j);
+ }
+ }
+ }
+
+ void deleteWithSomeInsertsComplexColumn(int key_step, int delete_step, int readd_step) throws Throwable
+ {
+ for (int i = 0; i < KEY_COUNT; i += key_step)
+ {
+ if (delete_step > 0)
+ for (int j = i % delete_step; j < CLUSTERING_COUNT; j += delete_step)
+ {
+ execute("DELETE data[?] FROM %s WHERE key = ?", j, i);
+ }
+ if (readd_step > 0)
+ for (int j = i % readd_step; j < CLUSTERING_COUNT; j += readd_step)
+ {
+ execute("UPDATE %s SET data[?] = ? WHERE key = ?", j, -(i+j), i);
+ }
+ }
+ }
+
+ int countTombstoneMarkers(SSTableReader reader)
+ {
+ int nowInSec = FBUtilities.nowInSeconds();
+ return count(reader, x -> x.isRangeTombstoneMarker() || x.isRow() && ((Row) x).hasDeletion(nowInSec) ? 1 : 0, x -> x.partitionLevelDeletion().isLive() ? 0 : 1);
+ }
+
+ int countRows(SSTableReader reader)
+ {
+ int nowInSec = FBUtilities.nowInSeconds();
+ return count(reader, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec) ? 1 : 0, x -> 0);
+ }
+
+ int countCells(SSTableReader reader)
+ {
+ return count(reader, x -> x.isRow() ? Iterables.size((Row) x) : 0, x -> 0);
+ }
+
+ int countStaticCells(SSTableReader reader)
+ {
+ return count(reader, x -> 0, x -> Iterables.size(x.staticRow()));
+ }
+
+ int countComplexCells(SSTableReader reader)
+ {
+ return count(reader, x -> x.isRow() ? ((Row) x).stream().mapToInt(this::countComplex).sum() : 0, x -> 0);
+ }
+
+ int countComplex(ColumnData c)
+ {
+ if (!(c instanceof ComplexColumnData))
+ return 0;
+ ComplexColumnData ccd = (ComplexColumnData) c;
+ return ccd.cellsCount();
+ }
+
+ int count(SSTableReader reader, Function<Unfiltered, Integer> predicate, Function<UnfilteredRowIterator, Integer> partitionPredicate)
+ {
+ int instances = 0;
+ try (ISSTableScanner partitions = reader.getScanner())
+ {
+ while (partitions.hasNext())
+ {
+ try (UnfilteredRowIterator iter = partitions.next())
+ {
+ instances += partitionPredicate.apply(iter);
+ while (iter.hasNext())
+ {
+ Unfiltered atom = iter.next();
+ instances += predicate.apply(atom);
+ }
+ }
+ }
+ }
+ return instances;
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
index c930b2a..ece2d1d 100644
--- a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
@@ -51,7 +51,7 @@
String functionName;
@BeforeClass
- public static void setUpClass()
+ public static void setUpClass() // overrides CQLTester.setUpClass()
{
DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
new file mode 100644
index 0000000..2189e15
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import static org.junit.Assert.*;
+
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.*;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+public class CompactionIteratorTest
+{
+
+ private static final int NOW = 1000;
+ private static final int GC_BEFORE = 100;
+ private static final String KSNAME = "CompactionIteratorTest";
+ private static final String CFNAME = "Integer1";
+
+ static final DecoratedKey kk = Util.dk("key");
+ static final CFMetaData metadata;
+ private static final int RANGE = 1000;
+ private static final int COUNT = 100;
+
+ Map<List<Unfiltered>, DeletionTime> deletionTimes = new HashMap<>();
+
+ static {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KSNAME,
+ KeyspaceParams.simple(1),
+ metadata = SchemaLoader.standardCFMD(KSNAME,
+ CFNAME,
+ 1,
+ UTF8Type.instance,
+ Int32Type.instance,
+ Int32Type.instance));
+ }
+
+ // See org.apache.cassandra.db.rows.UnfilteredRowsGenerator.parse for the syntax used in these tests.
+
+ @Test
+ public void testGcCompactionSupersedeLeft()
+ {
+ testCompaction(new String[] {
+ "5<=[140] 10[150] [140]<20 22<[130] [130]<25 30[150]"
+ }, new String[] {
+ "7<[160] 15[180] [160]<30 40[120]"
+ },
+ 3);
+ }
+
+ @Test
+ public void testGcCompactionSupersedeMiddle()
+ {
+ testCompaction(new String[] {
+ "5<=[140] 10[150] [140]<40 60[150]"
+ }, new String[] {
+ "7<=[160] 15[180] [160]<=30 40[120]"
+ },
+ 3);
+ }
+
+ @Test
+ public void testGcCompactionSupersedeRight()
+ {
+ testCompaction(new String[] {
+ "9<=[140] 10[150] [140]<40 60[150]"
+ }, new String[] {
+ "7<[160] 15[180] [160]<30 40[120]"
+ },
+ 3);
+ }
+
+ @Test
+ public void testGcCompactionSwitchInSuperseded()
+ {
+ testCompaction(new String[] {
+ "5<=[140] 10[150] [140]<20 20<=[170] [170]<=50 60[150]"
+ }, new String[] {
+ "7<[160] 15[180] [160]<30 40[120]"
+ },
+ 5);
+ }
+
+ @Test
+ public void testGcCompactionBoundaries()
+ {
+ testCompaction(new String[] {
+ "5<=[120] [120]<9 9<=[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90"
+ }, new String[] {
+ "7<[160] 15[180] [160]<30 40[120] 45<[140] [140]<80 88<=[130] [130]<100"
+ },
+ 7);
+ }
+
+ @Test
+ public void testGcCompactionMatches()
+ {
+ testCompaction(new String[] {
+ "5<=[120] [120]<=9 9<[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90 120<=[100] [100]<130"
+ }, new String[] {
+ "9<[160] 15[180] [160]<40 40[120] 45<[140] [140]<90 90<=[110] [110]<100 120<=[100] [100]<130"
+ },
+ 5);
+ }
+
+ @Test
+ public void testGcCompactionRowDeletion()
+ {
+ testCompaction(new String[] {
+ "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
+ }, new String[] {
+ "10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]"
+ },
+ "25[160] 30[170] 50[120]");
+ }
+
+ @Test
+ public void testGcCompactionPartitionDeletion()
+ {
+ testCompaction(new String[] {
+ "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
+ }, new String[] {
+ // Dxx| stands for partition deletion at time xx
+ "D165|10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]"
+ },
+ "30[170]");
+ }
+
+ void testCompaction(String[] inputs, String[] tombstones, String expected)
+ {
+ testNonGcCompaction(inputs, tombstones);
+
+ UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+ List<List<Unfiltered>> inputLists = parse(inputs, generator);
+ List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
+ List<Unfiltered> result = compact(inputLists, tombstoneLists);
+ System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds");
+ generator.verifyValid(result);
+ verifyEquivalent(inputLists, result, tombstoneLists, generator);
+ List<Unfiltered> expectedResult = generator.parse(expected, NOW - 1);
+ if (!expectedResult.equals(result))
+ fail("Expected " + expected + ", got " + generator.str(result));
+ }
+
+ void testCompaction(String[] inputs, String[] tombstones, int expectedCount)
+ {
+ testNonGcCompaction(inputs, tombstones);
+
+ UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+ List<List<Unfiltered>> inputLists = parse(inputs, generator);
+ List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
+ List<Unfiltered> result = compact(inputLists, tombstoneLists);
+ System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds");
+ generator.verifyValid(result);
+ verifyEquivalent(inputLists, result, tombstoneLists, generator);
+ if (size(result) > expectedCount)
+ fail("Expected compaction with " + expectedCount + " elements, got " + size(result) + ": " + generator.str(result));
+ }
+
+ int testNonGcCompaction(String[] inputs, String[] tombstones)
+ {
+ UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+ List<List<Unfiltered>> inputLists = parse(inputs, generator);
+ List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
+ List<Unfiltered> result = compact(inputLists, Collections.emptyList());
+ System.out.println("Non-GC compaction resulted in " + size(result) + " Unfiltereds");
+ generator.verifyValid(result);
+ verifyEquivalent(inputLists, result, tombstoneLists, generator);
+ return size(result);
+ }
+
+ private static int size(List<Unfiltered> data)
+ {
+ return data.stream().mapToInt(x -> x instanceof RangeTombstoneBoundaryMarker ? 2 : 1).sum();
+ }
+
+ private void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> result, List<List<Unfiltered>> tombstoneSources, UnfilteredRowsGenerator generator)
+ {
+ // sources + tombstoneSources must be the same as result + tombstoneSources
+ List<Unfiltered> expected = compact(Iterables.concat(sources, tombstoneSources), Collections.emptyList());
+ List<Unfiltered> actual = compact(Iterables.concat(ImmutableList.of(result), tombstoneSources), Collections.emptyList());
+ if (!expected.equals(actual))
+ {
+ System.out.println("Equivalence test failure between sources:");
+ for (List<Unfiltered> partition : sources)
+ generator.dumpList(partition);
+ System.out.println("and compacted " + generator.str(result));
+ System.out.println("with tombstone sources:");
+ for (List<Unfiltered> partition : tombstoneSources)
+ generator.dumpList(partition);
+ System.out.println("expected " + generator.str(expected));
+ System.out.println("got " + generator.str(actual));
+ fail("Failed equivalence test.");
+ }
+ }
+
+ private List<List<Unfiltered>> parse(String[] inputs, UnfilteredRowsGenerator generator)
+ {
+ return ImmutableList.copyOf(Lists.transform(Arrays.asList(inputs), x -> parse(x, generator)));
+ }
+
+ private List<Unfiltered> parse(String input, UnfilteredRowsGenerator generator)
+ {
+ Matcher m = Pattern.compile("D(\\d+)\\|").matcher(input);
+ if (m.lookingAt())
+ {
+ int del = Integer.parseInt(m.group(1));
+ input = input.substring(m.end());
+ List<Unfiltered> list = generator.parse(input, NOW - 1);
+ deletionTimes.put(list, new DeletionTime(del, del));
+ return list;
+ }
+ else
+ return generator.parse(input, NOW - 1);
+ }
+
+ private List<Unfiltered> compact(Iterable<List<Unfiltered>> sources, Iterable<List<Unfiltered>> tombstoneSources)
+ {
+ List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(sources, list -> ImmutableList.of(listToIterator(list, kk))));
+ Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>();
+ transformedSources.put(kk, Iterables.transform(tombstoneSources, list -> listToIterator(list, kk)));
+ try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
+ CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
+ Lists.transform(content, x -> new Scanner(x)),
+ controller, NOW, null))
+ {
+ List<Unfiltered> result = new ArrayList<>();
+ assertTrue(iter.hasNext());
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ Iterators.addAll(result, partition);
+ }
+ assertFalse(iter.hasNext());
+ return result;
+ }
+ }
+
+ private UnfilteredRowIterator listToIterator(List<Unfiltered> list, DecoratedKey key)
+ {
+ return UnfilteredRowsGenerator.source(list, metadata, key, deletionTimes.getOrDefault(list, DeletionTime.LIVE));
+ }
+
+ NavigableMap<DecoratedKey, List<Unfiltered>> generateContent(Random rand, UnfilteredRowsGenerator generator,
+ List<DecoratedKey> keys, int pcount, int rcount)
+ {
+ NavigableMap<DecoratedKey, List<Unfiltered>> map = new TreeMap<>();
+ for (int i = 0; i < pcount; ++i)
+ {
+ DecoratedKey key = keys.get(rand.nextInt(keys.size()));
+ map.put(key, generator.generateSource(rand, rcount, RANGE, NOW - 5, x -> NOW - 1));
+ }
+ return map;
+ }
+
+ @Test
+ public void testRandom()
+ {
+ UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+ for (int seed = 1; seed < 100; ++seed)
+ {
+ Random rand = new Random(seed);
+ List<List<Unfiltered>> sources = new ArrayList<>();
+ for (int i = 0; i < 10; ++i)
+ sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15));
+ int srcSz = sources.stream().mapToInt(CompactionIteratorTest::size).sum();
+ List<List<Unfiltered>> tombSources = new ArrayList<>();
+ for (int i = 0; i < 10; ++i)
+ sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15));
+ List<Unfiltered> result = compact(sources, tombSources);
+ verifyEquivalent(sources, result, tombSources, generator);
+ assertTrue(size(result) < srcSz);
+ }
+ }
+
+ class Controller extends CompactionController
+ {
+ private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources;
+
+ public Controller(ColumnFamilyStore cfs, Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources, int gcBefore)
+ {
+ super(cfs, Collections.emptySet(), gcBefore);
+ this.tombstoneSources = tombstoneSources;
+ }
+
+ @Override
+ public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly)
+ {
+ assert tombstoneOnly;
+ return tombstoneSources.get(key);
+ }
+ }
+
+ class Scanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner
+ {
+ Iterator<UnfilteredRowIterator> iter;
+
+ Scanner(Iterable<UnfilteredRowIterator> content)
+ {
+ iter = content.iterator();
+ }
+
+ @Override
+ public boolean isForThrift()
+ {
+ return false;
+ }
+
+ @Override
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ @Override
+ public UnfilteredRowIterator next()
+ {
+ return iter.next();
+ }
+
+ @Override
+ public long getLengthInBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getCurrentPosition()
+ {
+ return 0;
+ }
+
+ @Override
+ public String getBackingFiles()
+ {
+ return null;
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
index 0eeb379..3335c02 100644
--- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -96,9 +96,11 @@
@SuppressWarnings("unused")
public void testTombstoneMerge(boolean reversed, boolean iterations)
{
+ this.reversed = reversed;
+ UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(comparator, reversed);
+
for (int seed = 1; seed <= 100; ++seed)
{
- this.reversed = reversed;
if (ITEMS <= 20)
System.out.println("\nSeed " + seed);
@@ -112,27 +114,29 @@
if (ITEMS <= 20)
System.out.println("Merging");
for (int i=0; i<ITERATORS; ++i)
- sources.add(generateSource(r, timeGenerators.get(r.nextInt(timeGenerators.size()))));
+ sources.add(generator.generateSource(r, ITEMS, RANGE, DEL_RANGE, timeGenerators.get(r.nextInt(timeGenerators.size()))));
List<Unfiltered> merged = merge(sources, iterations);
if (ITEMS <= 20)
System.out.println("results in");
if (ITEMS <= 20)
- dumpList(merged);
- verifyEquivalent(sources, merged);
- verifyValid(merged);
+ generator.dumpList(merged);
+ verifyEquivalent(sources, merged, generator);
+ generator.verifyValid(merged);
if (reversed)
{
Collections.reverse(merged);
- this.reversed = false;
- verifyValid(merged);
+ generator.verifyValid(merged, false);
}
}
}
private List<Unfiltered> merge(List<List<Unfiltered>> sources, boolean iterations)
{
- List<UnfilteredRowIterator> us = sources.stream().map(l -> new Source(l.iterator())).collect(Collectors.toList());
+ List<UnfilteredRowIterator> us = sources.
+ stream().
+ map(l -> new UnfilteredRowsGenerator.Source(l.iterator(), metadata, partitionKey, DeletionTime.LIVE, reversed)).
+ collect(Collectors.toList());
List<Unfiltered> merged = new ArrayList<>();
Iterators.addAll(merged, mergeIterators(us, iterations));
return merged;
@@ -285,24 +289,24 @@
}
}
- void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> merged)
+ void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> merged, UnfilteredRowsGenerator generator)
{
try
{
for (int i=0; i<RANGE; ++i)
{
- Clusterable c = clusteringFor(i);
+ Clusterable c = UnfilteredRowsGenerator.clusteringFor(i);
DeletionTime dt = DeletionTime.LIVE;
for (List<Unfiltered> source : sources)
{
dt = deletionFor(c, source, dt);
}
- Assert.assertEquals("Deletion time mismatch for position " + str(c), dt, deletionFor(c, merged));
+ Assert.assertEquals("Deletion time mismatch for position " + i, dt, deletionFor(c, merged));
if (dt == DeletionTime.LIVE)
{
Optional<Unfiltered> sourceOpt = sources.stream().map(source -> rowFor(c, source)).filter(x -> x != null).findAny();
Unfiltered mergedRow = rowFor(c, merged);
- Assert.assertEquals("Content mismatch for position " + str(c), str(sourceOpt.orElse(null)), str(mergedRow));
+ Assert.assertEquals("Content mismatch for position " + i, clustering(sourceOpt.orElse(null)), clustering(mergedRow));
}
}
}
@@ -310,13 +314,20 @@
{
System.out.println(e);
for (List<Unfiltered> list : sources)
- dumpList(list);
+ generator.dumpList(list);
System.out.println("merged");
- dumpList(merged);
+ generator.dumpList(merged);
throw e;
}
}
+ String clustering(Clusterable curr)
+ {
+ if (curr == null)
+ return "null";
+ return Int32Type.instance.getString(curr.clustering().get(0));
+ }
+
private Unfiltered rowFor(Clusterable pointer, List<Unfiltered> list)
{
int index = Collections.binarySearch(list, pointer, reversed ? comparator.reversed() : comparator);
@@ -424,21 +435,23 @@
public void testForInput(String... inputs)
{
+ reversed = false;
+ UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(comparator, false);
+
List<List<Unfiltered>> sources = new ArrayList<>();
for (String input : inputs)
{
- List<Unfiltered> source = parse(input);
- attachBoundaries(source);
- dumpList(source);
- verifyValid(source);
+ List<Unfiltered> source = generator.parse(input, DEL_RANGE);
+ generator.dumpList(source);
+ generator.verifyValid(source);
sources.add(source);
}
List<Unfiltered> merged = merge(sources, false);
System.out.println("Merge to:");
- dumpList(merged);
- verifyEquivalent(sources, merged);
- verifyValid(merged);
+ generator.dumpList(merged);
+ verifyEquivalent(sources, merged, generator);
+ generator.verifyValid(merged);
System.out.println();
}
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java
new file mode 100644
index 0000000..7cdccdb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.Assert;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.rows.Unfiltered.Kind;
+import org.apache.cassandra.utils.btree.BTree;
+
+public class UnfilteredRowsGenerator
+{
+ final boolean reversed;
+ final Comparator<Clusterable> comparator;
+
+ public UnfilteredRowsGenerator(Comparator<Clusterable> comparator, boolean reversed)
+ {
+ this.reversed = reversed;
+ this.comparator = comparator;
+ }
+
+ String str(Clusterable curr)
+ {
+ if (curr == null)
+ return "null";
+ String val = Int32Type.instance.getString(curr.clustering().get(0));
+ if (curr instanceof RangeTombstoneMarker)
+ {
+ RangeTombstoneMarker marker = (RangeTombstoneMarker) curr;
+ if (marker.isClose(reversed))
+ val = "[" + marker.closeDeletionTime(reversed).markedForDeleteAt() + "]" + (marker.closeIsInclusive(reversed) ? "<=" : "<") + val;
+ if (marker.isOpen(reversed))
+ val = val + (marker.openIsInclusive(reversed) ? "<=" : "<") + "[" + marker.openDeletionTime(reversed).markedForDeleteAt() + "]";
+ }
+ else if (curr instanceof Row)
+ {
+ Row row = (Row) curr;
+ String delTime = "";
+ if (!row.deletion().time().isLive())
+ delTime = "D" + row.deletion().time().markedForDeleteAt();
+ val = val + "[" + row.primaryKeyLivenessInfo().timestamp() + delTime + "]";
+ }
+ return val;
+ }
+
+ public void verifyValid(List<Unfiltered> list)
+ {
+ verifyValid(list, reversed);
+ }
+
+ void verifyValid(List<Unfiltered> list, boolean reversed)
+ {
+ int reversedAsMultiplier = reversed ? -1 : 1;
+ try {
+ RangeTombstoneMarker prev = null;
+ Unfiltered prevUnfiltered = null;
+ for (Unfiltered unfiltered : list)
+ {
+ Assert.assertTrue("Order violation prev " + str(prevUnfiltered) + " curr " + str(unfiltered),
+ prevUnfiltered == null || comparator.compare(prevUnfiltered, unfiltered) * reversedAsMultiplier < 0);
+ prevUnfiltered = unfiltered;
+
+ if (unfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER)
+ {
+ RangeTombstoneMarker curr = (RangeTombstoneMarker) unfiltered;
+ if (prev != null)
+ {
+ if (curr.isClose(reversed))
+ {
+ Assert.assertTrue(str(unfiltered) + " follows another close marker " + str(prev), prev.isOpen(reversed));
+ Assert.assertEquals("Deletion time mismatch for open " + str(prev) + " and close " + str(unfiltered),
+ prev.openDeletionTime(reversed),
+ curr.closeDeletionTime(reversed));
+ }
+ else
+ Assert.assertFalse(str(curr) + " follows another open marker " + str(prev), prev.isOpen(reversed));
+ }
+
+ prev = curr;
+ }
+ }
+ Assert.assertFalse("Cannot end in open marker " + str(prev), prev != null && prev.isOpen(reversed));
+
+ } catch (AssertionError e) {
+ System.out.println(e);
+ dumpList(list);
+ throw e;
+ }
+ }
+
+ public List<Unfiltered> generateSource(Random r, int items, int range, int del_range, Function<Integer, Integer> timeGenerator)
+ {
+ int[] positions = new int[items + 1];
+ for (int i=0; i<items; ++i)
+ positions[i] = r.nextInt(range);
+ positions[items] = range;
+ Arrays.sort(positions);
+
+ List<Unfiltered> content = new ArrayList<>(items);
+ int prev = -1;
+ for (int i=0; i<items; ++i)
+ {
+ int pos = positions[i];
+ int sz = positions[i + 1] - pos;
+ if (sz == 0 && pos == prev)
+ // Filter out more than two of the same position.
+ continue;
+ if (r.nextBoolean() || pos == prev)
+ {
+ int span;
+ boolean includesStart;
+ boolean includesEnd;
+ if (pos > prev)
+ {
+ span = r.nextInt(sz + 1);
+ includesStart = span > 0 ? r.nextBoolean() : true;
+ includesEnd = span > 0 ? r.nextBoolean() : true;
+ }
+ else
+ {
+ span = 1 + r.nextInt(sz);
+ includesStart = false;
+ includesEnd = r.nextBoolean();
+ }
+ int deltime = r.nextInt(del_range);
+ DeletionTime dt = new DeletionTime(deltime, deltime);
+ content.add(new RangeTombstoneBoundMarker(boundFor(pos, true, includesStart), dt));
+ content.add(new RangeTombstoneBoundMarker(boundFor(pos + span, false, includesEnd), dt));
+ prev = pos + span - (includesEnd ? 0 : 1);
+ }
+ else
+ {
+ content.add(emptyRowAt(pos, timeGenerator));
+ prev = pos;
+ }
+ }
+
+ attachBoundaries(content);
+ if (reversed)
+ {
+ Collections.reverse(content);
+ }
+ verifyValid(content);
+ if (items <= 20)
+ dumpList(content);
+ return content;
+ }
+
+ /**
+ * Constructs a list of unfiltereds with integer clustering according to the specification string.
+ *
+ * The string is a space-delimited sorted list that can contain:
+ * * open tombstone markers, e.g. xx<[yy] where xx is the clustering, yy is the deletion time, and "<" stands for
+ * non-inclusive (<= for inclusive).
+ * * close tombstone markers, e.g. [yy]<=xx. Adjacent close and open markers (e.g. [yy]<=xx xx<[zz]) are combined
+ * into boundary markers.
+ * * empty rows, e.g. xx or xx[yy] or xx[yyDzz] where xx is the clustering, yy is the live time and zz is deletion
+ * time.
+ *
+ * @param input Specification.
+ * @param default_liveness Liveness to use for rows if not explicitly specified.
+ * @return Parsed list.
+ */
+ public List<Unfiltered> parse(String input, int default_liveness)
+ {
+ String[] split = input.split(" ");
+ Pattern open = Pattern.compile("(\\d+)<(=)?\\[(\\d+)\\]");
+ Pattern close = Pattern.compile("\\[(\\d+)\\]<(=)?(\\d+)");
+ Pattern row = Pattern.compile("(\\d+)(\\[(\\d+)(?:D(\\d+))?\\])?");
+ List<Unfiltered> out = new ArrayList<>(split.length);
+ for (String s : split)
+ {
+ Matcher m = open.matcher(s);
+ if (m.matches())
+ {
+ out.add(openMarker(Integer.parseInt(m.group(1)), Integer.parseInt(m.group(3)), m.group(2) != null));
+ continue;
+ }
+ m = close.matcher(s);
+ if (m.matches())
+ {
+ out.add(closeMarker(Integer.parseInt(m.group(3)), Integer.parseInt(m.group(1)), m.group(2) != null));
+ continue;
+ }
+ m = row.matcher(s);
+ if (m.matches())
+ {
+ int live = m.group(3) != null ? Integer.parseInt(m.group(3)) : default_liveness;
+ int delTime = m.group(4) != null ? Integer.parseInt(m.group(4)) : -1;
+ out.add(emptyRowAt(Integer.parseInt(m.group(1)), live, delTime));
+ continue;
+ }
+ Assert.fail("Can't parse " + s);
+ }
+ attachBoundaries(out);
+ return out;
+ }
+
+ static Row emptyRowAt(int pos, Function<Integer, Integer> timeGenerator)
+ {
+ final Clustering clustering = clusteringFor(pos);
+ final LivenessInfo live = LivenessInfo.create(timeGenerator.apply(pos), UnfilteredRowIteratorsMergeTest.nowInSec);
+ return BTreeRow.noCellLiveRow(clustering, live);
+ }
+
+ static Row emptyRowAt(int pos, int time, int deletionTime)
+ {
+ final Clustering clustering = clusteringFor(pos);
+ final LivenessInfo live = LivenessInfo.create(time, UnfilteredRowIteratorsMergeTest.nowInSec);
+ final DeletionTime delTime = deletionTime == -1 ? DeletionTime.LIVE : new DeletionTime(deletionTime, deletionTime);
+ return BTreeRow.create(clustering, live, Row.Deletion.regular(delTime), BTree.empty());
+ }
+
+ static Clustering clusteringFor(int i)
+ {
+ return Clustering.make(Int32Type.instance.decompose(i));
+ }
+
+ static ClusteringBound boundFor(int pos, boolean start, boolean inclusive)
+ {
+ return ClusteringBound.create(ClusteringBound.boundKind(start, inclusive), new ByteBuffer[] {Int32Type.instance.decompose(pos)});
+ }
+
+ static void attachBoundaries(List<Unfiltered> content)
+ {
+ int di = 0;
+ RangeTombstoneMarker prev = null;
+ for (int si = 0; si < content.size(); ++si)
+ {
+ Unfiltered currUnfiltered = content.get(si);
+ RangeTombstoneMarker curr = currUnfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER ?
+ (RangeTombstoneMarker) currUnfiltered :
+ null;
+ if (prev != null && curr != null && prev.isClose(false) && curr.isOpen(false) && prev.clustering().invert().equals(curr.clustering()))
+ {
+ // Join. Prefer not to use merger to check its correctness.
+ ClusteringBound b = (ClusteringBound) prev.clustering();
+ ClusteringBoundary boundary = ClusteringBoundary.create(
+ b.isInclusive() ? ClusteringBound.Kind.INCL_END_EXCL_START_BOUNDARY : ClusteringBound.Kind.EXCL_END_INCL_START_BOUNDARY,
+ b.getRawValues());
+ prev = new RangeTombstoneBoundaryMarker(boundary, prev.closeDeletionTime(false), curr.openDeletionTime(false));
+ currUnfiltered = prev;
+ --di;
+ }
+ content.set(di++, currUnfiltered);
+ prev = curr;
+ }
+ for (int pos = content.size() - 1; pos >= di; --pos)
+ content.remove(pos);
+ }
+
+ static RangeTombstoneMarker openMarker(int pos, int delTime, boolean inclusive)
+ {
+ return marker(pos, delTime, true, inclusive);
+ }
+
+ static RangeTombstoneMarker closeMarker(int pos, int delTime, boolean inclusive)
+ {
+ return marker(pos, delTime, false, inclusive);
+ }
+
+ private static RangeTombstoneMarker marker(int pos, int delTime, boolean isStart, boolean inclusive)
+ {
+ return new RangeTombstoneBoundMarker(ClusteringBound.create(ClusteringBound.boundKind(isStart, inclusive),
+ new ByteBuffer[] {clusteringFor(pos).get(0)}),
+ new DeletionTime(delTime, delTime));
+ }
+
+ public static UnfilteredRowIterator source(Iterable<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey)
+ {
+ return source(content, metadata, partitionKey, DeletionTime.LIVE);
+ }
+
+ public static UnfilteredRowIterator source(Iterable<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey, DeletionTime delTime)
+ {
+ return new Source(content.iterator(), metadata, partitionKey, delTime, false);
+ }
+
+ static class Source extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator
+ {
+ Iterator<Unfiltered> content;
+
+ protected Source(Iterator<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey, DeletionTime partitionLevelDeletion, boolean reversed)
+ {
+ super(metadata,
+ partitionKey,
+ partitionLevelDeletion,
+ metadata.partitionColumns(),
+ Rows.EMPTY_STATIC_ROW,
+ reversed,
+ EncodingStats.NO_STATS);
+ this.content = content;
+ }
+
+ @Override
+ protected Unfiltered computeNext()
+ {
+ return content.hasNext() ? content.next() : endOfData();
+ }
+ }
+
+ public String str(List<Unfiltered> list)
+ {
+ StringBuilder builder = new StringBuilder();
+ for (Unfiltered u : list)
+ {
+ builder.append(str(u));
+ builder.append(' ');
+ }
+ return builder.toString();
+ }
+
+ public void dumpList(List<Unfiltered> list)
+ {
+ System.out.println(str(list));
+ }
+}
\ No newline at end of file