blob: f80b113097a5fe97ff71131e7bf76bc7bfdb1988 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
import java.util.function.LongPredicate;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PurgeFunction;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.transform.MoreRows;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
@NotThreadSafe
class RepairedDataInfo
{
public static final RepairedDataInfo NO_OP_REPAIRED_DATA_INFO = new RepairedDataInfo(null)
{
@Override
public UnfilteredPartitionIterator withRepairedDataInfo(UnfilteredPartitionIterator iterator)
{
return iterator;
}
@Override
public UnfilteredRowIterator withRepairedDataInfo(UnfilteredRowIterator iterator)
{
return iterator;
}
@Override
public UnfilteredPartitionIterator extend(UnfilteredPartitionIterator partitions, DataLimits.Counter limit)
{
return partitions;
}
};
// Keeps a digest of the partition currently being processed. Since we won't know
// whether a partition will be fully purged from a read result until it's been
// consumed, we buffer this per-partition digest and add it to the final digest
// when the partition is closed (if it wasn't fully purged).
private Digest perPartitionDigest;
private Digest perCommandDigest;
private boolean isConclusive = true;
private ByteBuffer calculatedDigest = null;
// Doesn't actually purge from the underlying iterators, but excludes from the digest
// the purger can't be initialized until we've iterated all the sstables for the query
// as it requires the oldest repaired tombstone
private RepairedDataPurger purger;
private boolean isFullyPurged = true;
// Supplies additional partitions from the repaired data set to be consumed when the limit of
// executing ReadCommand has been reached. This is to ensure that each replica attempts to
// read the same amount of repaired data, otherwise comparisons of the repaired data digests
// may be invalidated by varying amounts of repaired data being present on each replica.
// This can't be initialized until after the underlying repaired iterators have been merged.
private UnfilteredPartitionIterator postLimitPartitions = null;
private final DataLimits.Counter repairedCounter;
private UnfilteredRowIterator currentPartition;
private TableMetrics metrics;
public RepairedDataInfo(DataLimits.Counter repairedCounter)
{
this.repairedCounter = repairedCounter;
}
/**
* If either repaired status tracking is not active or the command has not yet been
* executed, then this digest will be an empty buffer.
* Otherwise, it will contain a digest of the repaired data read, or an empty buffer
* if no repaired data was read.
*
* @return a digest of the repaired data read during local execution of a command
*/
ByteBuffer getDigest()
{
if (calculatedDigest != null)
return calculatedDigest;
calculatedDigest = perCommandDigest == null
? ByteBufferUtil.EMPTY_BYTE_BUFFER
: ByteBuffer.wrap(perCommandDigest.digest());
return calculatedDigest;
}
void prepare(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
{
this.purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
this.metrics = cfs.metric;
}
void finalize(UnfilteredPartitionIterator postLimitPartitions)
{
this.postLimitPartitions = postLimitPartitions;
}
/**
* Returns a boolean indicating whether any relevant sstables were skipped during the read
* that produced the repaired data digest.
*
* If true, then no pending repair sessions or partition deletes have influenced the extent
* of the repaired sstables that went into generating the digest.
* This indicates whether or not the digest can reliably be used to infer consistency
* issues between the repaired sets across replicas.
*
* If either repaired status tracking is not active or the command has not yet been
* executed, then this will always return true.
*
* @return boolean to indicate confidence in the whether or not the digest of the repaired data can be
* reliably be used to infer inconsistency issues between the repaired sets across replicas
*/
boolean isConclusive()
{
return isConclusive;
}
void markInconclusive()
{
isConclusive = false;
}
private void onNewPartition(UnfilteredRowIterator partition)
{
assert purger != null;
purger.setCurrentKey(partition.partitionKey());
purger.setIsReverseOrder(partition.isReverseOrder());
this.currentPartition = partition;
}
private Digest getPerPartitionDigest()
{
if (perPartitionDigest == null)
perPartitionDigest = Digest.forRepairedDataTracking();
return perPartitionDigest;
}
public UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator)
{
class WithTracking extends Transformation<UnfilteredRowIterator>
{
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
return withRepairedDataInfo(partition);
}
}
return Transformation.apply(iterator, new WithTracking());
}
public UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator)
{
class WithTracking extends Transformation<UnfilteredRowIterator>
{
protected DecoratedKey applyToPartitionKey(DecoratedKey key)
{
getPerPartitionDigest().update(key.getKey());
return key;
}
protected DeletionTime applyToDeletion(DeletionTime deletionTime)
{
if (repairedCounter.isDone())
return deletionTime;
assert purger != null;
DeletionTime purged = purger.applyToDeletion(deletionTime);
if (!purged.isLive())
isFullyPurged = false;
purged.digest(getPerPartitionDigest());
return deletionTime;
}
protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
if (repairedCounter.isDone())
return marker;
assert purger != null;
RangeTombstoneMarker purged = purger.applyToMarker(marker);
if (purged != null)
{
isFullyPurged = false;
purged.digest(getPerPartitionDigest());
}
return marker;
}
protected Row applyToStatic(Row row)
{
return applyToRow(row);
}
protected Row applyToRow(Row row)
{
if (repairedCounter.isDone())
return row;
assert purger != null;
Row purged = purger.applyToRow(row);
if (purged != null && !purged.isEmpty())
{
isFullyPurged = false;
purged.digest(getPerPartitionDigest());
}
return row;
}
protected void onPartitionClose()
{
if (perPartitionDigest != null)
{
// If the partition wasn't completely emptied by the purger,
// calculate the digest for the partition and use it to
// update the overall digest
if (!isFullyPurged)
{
if (perCommandDigest == null)
perCommandDigest = Digest.forRepairedDataTracking();
byte[] partitionDigest = perPartitionDigest.digest();
perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
}
perPartitionDigest = null;
}
isFullyPurged = true;
}
}
if (repairedCounter.isDone())
return iterator;
UnfilteredRowIterator tracked = repairedCounter.applyTo(Transformation.apply(iterator, new WithTracking()));
onNewPartition(tracked);
return tracked;
}
public UnfilteredPartitionIterator extend(final UnfilteredPartitionIterator partitions,
final DataLimits.Counter limit)
{
class OverreadRepairedData extends Transformation<UnfilteredRowIterator> implements MoreRows<UnfilteredRowIterator>
{
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
return MoreRows.extend(partition, this, partition.columns());
}
public UnfilteredRowIterator moreContents()
{
// We don't need to do anything until the DataLimits of the
// of the read have been reached
if (!limit.isDone() || repairedCounter.isDone())
return null;
long countBeforeOverreads = repairedCounter.counted();
long overreadStartTime = System.nanoTime();
if (currentPartition != null)
consumePartition(currentPartition, repairedCounter);
if (postLimitPartitions != null)
while (postLimitPartitions.hasNext() && !repairedCounter.isDone())
consumePartition(postLimitPartitions.next(), repairedCounter);
// we're not actually providing any more rows, just consuming the repaired data
long rows = repairedCounter.counted() - countBeforeOverreads;
long nanos = System.nanoTime() - overreadStartTime;
metrics.repairedDataTrackingOverreadRows.update(rows);
metrics.repairedDataTrackingOverreadTime.update(nanos, TimeUnit.NANOSECONDS);
Tracing.trace("Read {} additional rows of repaired data for tracking in {}ps", rows, TimeUnit.NANOSECONDS.toMicros(nanos));
return null;
}
private void consumePartition(UnfilteredRowIterator partition, DataLimits.Counter counter)
{
if (partition == null)
return;
while (!counter.isDone() && partition.hasNext())
partition.next();
partition.close();
}
}
// If the read didn't touch any sstables prepare() hasn't been called and
// we can skip this transformation
if (metrics == null || repairedCounter.isDone())
return partitions;
return Transformation.apply(partitions, new OverreadRepairedData());
}
/**
* Although PurgeFunction extends Transformation, this is never applied to an iterator.
* Instead, it is used by RepairedDataInfo during the generation of a repaired data
* digest to exclude data which will actually be purged later on in the read pipeline.
*/
private static class RepairedDataPurger extends PurgeFunction
{
RepairedDataPurger(ColumnFamilyStore cfs,
int nowInSec,
int oldestUnrepairedTombstone)
{
super(nowInSec,
cfs.gcBefore(nowInSec),
oldestUnrepairedTombstone,
cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
cfs.metadata.get().enforceStrictLiveness());
}
protected LongPredicate getPurgeEvaluator()
{
return (time) -> true;
}
void setCurrentKey(DecoratedKey key)
{
super.onNewPartition(key);
}
void setIsReverseOrder(boolean isReverseOrder)
{
super.setReverseOrder(isReverseOrder);
}
public DeletionTime applyToDeletion(DeletionTime deletionTime)
{
return super.applyToDeletion(deletionTime);
}
public Row applyToRow(Row row)
{
return super.applyToRow(row);
}
public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
return super.applyToMarker(marker);
}
}
}