blob: d47bd8c1d281841055bbaa030c6bc1f1056c16dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.rows;
import java.util.Comparator;
import java.util.Iterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
/**
* An iterator that merges a source of rows with the range tombstone and partition level deletion of a give partition.
* <p>
* This is used by our {@code Partition} implementations to produce a {@code UnfilteredRowIterator} by merging the rows
* and deletion infos that are kept separate. This has also 2 additional role:
* 1) this make sure the row returned only includes the columns selected for the resulting iterator.
* 2) this (optionally) remove any data that can be shadowed (see commet on 'removeShadowedData' below for more details)
*/
public class RowAndDeletionMergeIterator extends AbstractUnfilteredRowIterator
{
// For some of our Partition implementation, we can't guarantee that the deletion information (partition level
// deletion and range tombstones) don't shadow data in the rows. If that is the case, this class also take
// cares of skipping such shadowed data (since it is the contract of an UnfilteredRowIterator that it doesn't
// shadow its own data). Sometimes however, we know this can't happen, in which case we can skip that step.
private final boolean removeShadowedData;
private final Comparator<Clusterable> comparator;
private final ColumnFilter selection;
private final Iterator<Row> rows;
private Row nextRow;
private final Iterator<RangeTombstone> ranges;
private RangeTombstone nextRange;
// The currently open tombstone. Note that unless this is null, there is no point in checking nextRange.
private RangeTombstone openRange;
public RowAndDeletionMergeIterator(CFMetaData metadata,
DecoratedKey partitionKey,
DeletionTime partitionLevelDeletion,
ColumnFilter selection,
Row staticRow,
boolean isReversed,
EncodingStats stats,
Iterator<Row> rows,
Iterator<RangeTombstone> ranges,
boolean removeShadowedData)
{
super(metadata, partitionKey, partitionLevelDeletion, selection.fetchedColumns(), staticRow, isReversed, stats);
this.comparator = isReversed ? metadata.comparator.reversed() : metadata.comparator;
this.selection = selection;
this.removeShadowedData = removeShadowedData;
this.rows = rows;
this.ranges = ranges;
}
private Unfiltered computeNextInternal()
{
while (true)
{
updateNextRow();
if (nextRow == null)
{
if (openRange != null)
return closeOpenedRange();
updateNextRange();
return nextRange == null ? endOfData() : openRange();
}
// We have a next row
if (openRange == null)
{
// We have no currently open tombstone range. So check if we have a next range and if it sorts before this row.
// If it does, the opening of that range should go first. Otherwise, the row goes first.
updateNextRange();
if (nextRange != null && comparator.compare(openBound(nextRange), nextRow.clustering()) < 0)
return openRange();
Row row = consumeNextRow();
// it's possible for the row to be fully shadowed by the current range tombstone
if (row != null)
return row;
}
else
{
// We have both a next row and a currently opened tombstone. Check which goes first between the range closing and the row.
if (comparator.compare(closeBound(openRange), nextRow.clustering()) < 0)
return closeOpenedRange();
Row row = consumeNextRow();
if (row != null)
return row;
}
}
}
/**
* RangeTombstoneList doesn't correctly merge multiple superseded rts, or overlapping rts with the
* same ts. This causes it to emit noop boundary markers which can cause unneeded read repairs and
* repair over streaming. This should technically be fixed in RangeTombstoneList. However, fixing
* it isn't trivial and that class is already so complicated that the fix would have a good chance
* of adding a worse bug. So we just swallow the noop boundary markers here. See CASSANDRA-14894
*/
private static boolean shouldSkip(Unfiltered unfiltered)
{
if (unfiltered == null || !unfiltered.isRangeTombstoneMarker())
return false;
RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
if (!marker.isBoundary())
return false;
DeletionTime open = marker.openDeletionTime(false);
DeletionTime close = marker.closeDeletionTime(false);
return open.equals(close);
}
@Override
protected Unfiltered computeNext()
{
while (true)
{
Unfiltered next = computeNextInternal();
if (shouldSkip(next))
continue;
return next;
}
}
private void updateNextRow()
{
if (nextRow == null && rows.hasNext())
nextRow = rows.next();
}
private void updateNextRange()
{
while (nextRange == null && ranges.hasNext())
{
nextRange = ranges.next();
if ((removeShadowedData && partitionLevelDeletion().supersedes(nextRange.deletionTime()))
|| nextRange.deletedSlice().isEmpty(metadata.comparator))
nextRange = null;
}
}
private Row consumeNextRow()
{
Row row = nextRow;
nextRow = null;
if (!removeShadowedData)
return row.filter(selection, metadata());
DeletionTime activeDeletion = openRange == null ? partitionLevelDeletion() : openRange.deletionTime();
return row.filter(selection, activeDeletion, false, metadata());
}
private RangeTombstone consumeNextRange()
{
RangeTombstone range = nextRange;
nextRange = null;
return range;
}
private RangeTombstone consumeOpenRange()
{
RangeTombstone range = openRange;
openRange = null;
return range;
}
private Slice.Bound openBound(RangeTombstone range)
{
return range.deletedSlice().open(isReverseOrder());
}
private Slice.Bound closeBound(RangeTombstone range)
{
return range.deletedSlice().close(isReverseOrder());
}
private RangeTombstoneMarker closeOpenedRange()
{
// Check if that close if actually a boundary between markers
updateNextRange();
RangeTombstoneMarker marker;
if (nextRange != null && comparator.compare(closeBound(openRange), openBound(nextRange)) == 0)
{
marker = RangeTombstoneBoundaryMarker.makeBoundary(isReverseOrder(), closeBound(openRange), openBound(nextRange), openRange.deletionTime(), nextRange.deletionTime());
openRange = consumeNextRange();
}
else
{
RangeTombstone toClose = consumeOpenRange();
marker = new RangeTombstoneBoundMarker(closeBound(toClose), toClose.deletionTime());
}
return marker;
}
private RangeTombstoneMarker openRange()
{
assert openRange == null && nextRange != null;
openRange = consumeNextRange();
return new RangeTombstoneBoundMarker(openBound(openRange), openRange.deletionTime());
}
}