* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.service.reads;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.UnaryOperator;
import org.apache.cassandra.cql3.statements.schema.IndexTarget;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
import org.apache.cassandra.db.transform.Filter;
import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.index.sasi.SASIIndex;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.TableMetadata;
import static*;
public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
private final boolean enforceStrictLiveness;
private final ReadRepair<E, P> readRepair;
public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
super(command, replicaPlan, queryStartNanoTime);
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
this.readRepair = readRepair;
public PartitionIterator getData()
ReadResponse response = responses.get(0).payload;
return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
public boolean isDataPresent()
return !responses.isEmpty();
public PartitionIterator resolve()
// We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
// at the beginning of this method), so grab the response count once and use that through the method.
Collection<Message<ReadResponse>> messages = responses.snapshot();
assert !any(messages, msg -> msg.payload.isDigestResponse());
E replicas = replicaPlan().candidates().select(transform(messages, Message::from), false);
// If requested, inspect each response for a digest of the replica's repaired data set
RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
? new RepairedDataTracker(getRepairedDataVerifier(command))
: null;
if (repairedDataTracker != null)
messages.forEach(msg -> {
if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from()).isFull())
if (!needsReplicaFilteringProtection())
ResolveContext context = new ResolveContext(replicas);
return resolveWithReadRepair(context,
i -> shortReadProtectedResponse(i, context),
return resolveWithReplicaFilteringProtection(replicas, repairedDataTracker);
private boolean needsReplicaFilteringProtection()
if (command.rowFilter().isEmpty())
return false;
IndexMetadata indexDef = command.indexMetadata();
if (indexDef != null && indexDef.isCustom())
String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
return !SASIIndex.class.getName().equals(className);
return true;
private class ResolveContext
private final E replicas;
private final DataLimits.Counter mergedResultCounter;
private ResolveContext(E replicas)
this.replicas = replicas;
this.mergedResultCounter = command.limits().newCounter(command.nowInSec(),
private boolean needsReadRepair()
return replicas.size() > 1;
private boolean needShortReadProtection()
// If we have only one result, there is no read repair to do and we can't get short reads
// Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a
// partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit,
// don't bother protecting against short reads.
return replicas.size() > 1 && !command.limits().isUnlimited();
private interface ResponseProvider
UnfilteredPartitionIterator getResponse(int i);
private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context)
UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command);
return context.needShortReadProtection()
? ShortReadProtection.extend(context.replicas.get(i),
: originalResponse;
private PartitionIterator resolveWithReadRepair(ResolveContext context,
ResponseProvider responseProvider,
UnaryOperator<PartitionIterator> preCountFilter,
RepairedDataTracker repairedDataTracker)
UnfilteredPartitionIterators.MergeListener listener = null;
if (context.needsReadRepair())
P sources = replicaPlan.getWithContacts(context.replicas);
listener = wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker);
return resolveInternal(context, listener, responseProvider, preCountFilter);
private PartitionIterator resolveWithReplicaFilteringProtection(E replicas, RepairedDataTracker repairedDataTracker)
// Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that
// wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version
// of that row) works in 3 steps:
// 1) we read the full response just to collect rows that may be outdated (the ones we got from some
// replica but didn't got any response for other; it could be those other replica have filtered a more
// up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the
// query limit. This simulate the worst case scenario where all those "potentially outdated" rows are
// indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read
// protection).
// 2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated
// or not.
// 3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair,
// but where for each replica we use their original response _plus_ the additional rows queried in the
// previous step (and apply the command#rowFilter() on the full result). Since the first phase has
// pessimistically collected enough results for the case where all potentially outdated results are indeed
// outdated, we shouldn't need further short-read protection requests during this phase.
// We need separate contexts, as each context has his own counter
ResolveContext firstPhaseContext = new ResolveContext(replicas);
ResolveContext secondPhaseContext = new ResolveContext(replicas);
ReplicaFilteringProtection<E> rfp = new ReplicaFilteringProtection<>(replicaPlan().keyspace(),
PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext,
i -> shortReadProtectedResponse(i, firstPhaseContext),
// Consume the first phase partitions to populate the replica filtering protection with both those materialized
// partitions and the primary keys to be fetched.
// After reading the entire query results the protection helper should have cached all the partitions so we can
// clear the responses accumulator for the sake of memory usage, given that the second phase might take long if
// it needs to query replicas.
return resolveWithReadRepair(secondPhaseContext,
results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec()),
private PartitionIterator resolveInternal(ResolveContext context,
UnfilteredPartitionIterators.MergeListener mergeListener,
ResponseProvider responseProvider,
UnaryOperator<PartitionIterator> preCountFilter)
int count = context.replicas.size();
List<UnfilteredPartitionIterator> results = new ArrayList<>(count);
for (int i = 0; i < count; i++)
* Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
* have more rows than the client requested. To make sure that we still conform to the original limit,
* we apply a top-level post-reconciliation counter to the merged partition iterator.
* Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied
* to the current partition to work. For this reason we have to apply the counter transformation before
* empty partition discard logic kicks in - for it will eagerly consume the iterator.
* That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
* See CASSANDRA-13747 for more details.
UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, mergeListener);
FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter);
return Transformation.apply(counted, new EmptyPartitionsDiscarder());
protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
return RepairedDataVerifier.verifier(command);
private String makeResponsesDebugString(DecoratedKey partitionKey)
return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from() + " => " + m.payload.toDebugString(command, partitionKey)));
private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
P sources,
RepairedDataTracker repairedDataTracker)
// Avoid wrapping no-op listener as it doesn't throw, unless we're tracking repaired status
// in which case we need to inject the tracker & verify on close
if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP)
if (repairedDataTracker == null)
return partitionListener;
return new UnfilteredPartitionIterators.MergeListener()
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
return UnfilteredRowIterators.MergeListener.NOOP;
public void close()
return new UnfilteredPartitionIterators.MergeListener()
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions);
return new UnfilteredRowIterators.MergeListener()
public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions);
catch (AssertionError e)
// The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
// rather get more info to debug than not.
TableMetadata table = command.metadata();
String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
mergedDeletion == null ? "null" : mergedDeletion.toString(),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
throw new AssertionError(details, e);
public Row onMergedRows(Row merged, Row[] versions)
return rowListener.onMergedRows(merged, versions);
catch (AssertionError e)
// The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
// rather get more info to debug than not.
TableMetadata table = command.metadata();
String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
merged == null ? "null" : merged.toString(table),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
throw new AssertionError(details, e);
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
// The code for merging range tombstones is a tad complex and we had the assertions there triggered
// unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
// when that happen without more context that what the assertion errors give us however, hence the
// catch here that basically gather as much as context as reasonable.
rowListener.onMergedRangeTombstoneMarkers(merged, versions);
catch (AssertionError e)
// The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
// rather get more info to debug than not.
TableMetadata table = command.metadata();
String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
merged == null ? "null" : merged.toString(table),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
throw new AssertionError(details, e);
public void close()
public void close()
if (repairedDataTracker != null)