blob: e7e49521c164c21f242d0d50cf660bc7f127293d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;
import com.google.common.base.Joiner;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
import org.apache.cassandra.db.transform.Filter;
import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
import static com.google.common.collect.Iterables.*;
public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>> extends ResponseResolver<E, P>
{
private final boolean enforceStrictLiveness;
private final ReadRepair<E, P> readRepair;
private final boolean trackRepairedStatus;
public DataResolver(ReadCommand command, Supplier<? extends P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
{
this(command, replicaPlan, readRepair, queryStartNanoTime, false);
}
public DataResolver(ReadCommand command, Supplier<? extends P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime, boolean trackRepairedStatus)
{
super(command, replicaPlan, queryStartNanoTime);
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
this.readRepair = readRepair;
this.trackRepairedStatus = trackRepairedStatus;
}
public PartitionIterator getData()
{
ReadResponse response = responses.get(0).payload;
return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
}
public boolean isDataPresent()
{
return !responses.isEmpty();
}
public PartitionIterator resolve()
{
return resolve(null);
}
public PartitionIterator resolve(@Nullable Runnable runOnShortRead)
{
// We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
// at the beginning of this method), so grab the response count once and use that through the method.
Collection<Message<ReadResponse>> messages = responses.snapshot();
assert !any(messages, msg -> msg.payload.isDigestResponse());
E replicas = replicaPlan().readCandidates().select(transform(messages, Message::from), false);
// If requested, inspect each response for a digest of the replica's repaired data set
RepairedDataTracker repairedDataTracker = trackRepairedStatus
? new RepairedDataTracker(getRepairedDataVerifier(command))
: null;
if (repairedDataTracker != null)
{
messages.forEach(msg -> {
if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from()).isFull())
{
repairedDataTracker.recordDigest(msg.from(),
msg.payload.repairedDataDigest(),
msg.payload.isRepairedDigestConclusive());
}
});
}
if (!needsReplicaFilteringProtection())
{
ResolveContext context = new ResolveContext(replicas);
return resolveWithReadRepair(context,
i -> shortReadProtectedResponse(i, context, runOnShortRead),
UnaryOperator.identity(),
repairedDataTracker);
}
return resolveWithReplicaFilteringProtection(replicas, repairedDataTracker);
}
private boolean needsReplicaFilteringProtection()
{
if (command.rowFilter().isEmpty())
return false;
IndexMetadata indexMetadata = command.indexMetadata();
if (indexMetadata == null || !indexMetadata.isCustom())
{
return true;
}
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(command.metadata().id);
assert cfs != null;
Index index = command.getIndex(cfs);
assert index != null;
return index.supportsReplicaFilteringProtection(command.rowFilter());
}
private class ResolveContext
{
private final E replicas;
private final DataLimits.Counter mergedResultCounter;
private ResolveContext(E replicas)
{
this.replicas = replicas;
this.mergedResultCounter = command.limits().newCounter(command.nowInSec(),
true,
command.selectsFullPartition(),
enforceStrictLiveness);
}
private boolean needsReadRepair()
{
return replicas.size() > 1;
}
private boolean needShortReadProtection()
{
// If we have only one result, there is no read repair to do and we can't get short reads
// Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a
// partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit,
// don't bother protecting against short reads.
return replicas.size() > 1 && !command.limits().isUnlimited();
}
}
@FunctionalInterface
private interface ResponseProvider
{
UnfilteredPartitionIterator getResponse(int i);
}
private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context, @Nullable Runnable onShortRead)
{
UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command);
return context.needShortReadProtection()
? ShortReadProtection.extend(context.replicas.get(i),
() -> { responses.clearUnsafe(i); if (onShortRead != null) onShortRead.run(); },
originalResponse,
command,
context.mergedResultCounter,
queryStartNanoTime,
enforceStrictLiveness)
: originalResponse;
}
private PartitionIterator resolveWithReadRepair(ResolveContext context,
ResponseProvider responseProvider,
UnaryOperator<PartitionIterator> preCountFilter,
RepairedDataTracker repairedDataTracker)
{
UnfilteredPartitionIterators.MergeListener listener = null;
if (context.needsReadRepair() && readRepair != NoopReadRepair.instance)
{
P sources = replicaPlan.get().withContacts(context.replicas);
listener = wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker);
}
return resolveInternal(context, listener, responseProvider, preCountFilter);
}
@SuppressWarnings("resource")
private PartitionIterator resolveWithReplicaFilteringProtection(E replicas, RepairedDataTracker repairedDataTracker)
{
// Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that
// wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version
// of that row) involves 3 main elements:
// 1) We combine short-read protection and a merge listener that identifies potentially "out-of-date"
// rows to create an iterator that is guaranteed to produce enough valid row results to satisfy the query
// limit if enough actually exist. A row is considered out-of-date if its merged from is non-empty and we
// receive not response from at least one replica. In this case, it is possible that filtering at the
// "silent" replica has produced a more up-to-date result.
// 2) This iterator is passed to the standard resolution process with read-repair, but is first wrapped in a
// response provider that lazily "completes" potentially out-of-date rows by directly querying them on the
// replicas that were previously silent. As this iterator is consumed, it caches valid data for potentially
// out-of-date rows, and this cached data is merged with the fetched data as rows are requested. If there
// is no replica divergence, only rows in the partition being evalutated will be cached (then released
// when the partition is consumed).
// 3) After a "complete" row is materialized, it must pass the row filter supplied by the original query
// before it counts against the limit.
// We need separate contexts, as each context has his own counter
ResolveContext firstPhaseContext = new ResolveContext(replicas);
ResolveContext secondPhaseContext = new ResolveContext(replicas);
ReplicaFilteringProtection<E> rfp = new ReplicaFilteringProtection<>(replicaPlan().keyspace(),
command,
replicaPlan().consistencyLevel(),
queryStartNanoTime,
firstPhaseContext.replicas,
DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(),
DatabaseDescriptor.getCachedReplicaRowsFailThreshold());
PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext,
rfp.mergeController(),
i -> shortReadProtectedResponse(i, firstPhaseContext, null),
UnaryOperator.identity());
PartitionIterator completedPartitions = resolveWithReadRepair(secondPhaseContext,
i -> rfp.queryProtectedPartitions(firstPhasePartitions, i),
results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec()),
repairedDataTracker);
// Ensure that the RFP instance has a chance to record metrics when the iterator closes.
return PartitionIterators.doOnClose(completedPartitions, firstPhasePartitions::close);
}
@SuppressWarnings("resource")
private PartitionIterator resolveInternal(ResolveContext context,
UnfilteredPartitionIterators.MergeListener mergeListener,
ResponseProvider responseProvider,
UnaryOperator<PartitionIterator> preCountFilter)
{
int count = context.replicas.size();
List<UnfilteredPartitionIterator> results = new ArrayList<>(count);
for (int i = 0; i < count; i++)
results.add(responseProvider.getResponse(i));
/*
* Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
* have more rows than the client requested. To make sure that we still conform to the original limit,
* we apply a top-level post-reconciliation counter to the merged partition iterator.
*
* Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied
* to the current partition to work. For this reason we have to apply the counter transformation before
* empty partition discard logic kicks in - for it will eagerly consume the iterator.
*
* That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
*
* See CASSANDRA-13747 for more details.
*/
UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, mergeListener);
Filter filter = new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness());
FilteredPartitions filtered = FilteredPartitions.filter(merged, filter);
PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter);
return Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
{
return RepairedDataVerifier.verifier(command);
}
private String makeResponsesDebugString(DecoratedKey partitionKey)
{
return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from() + " => " + m.payload.toDebugString(command, partitionKey)));
}
private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
P sources,
RepairedDataTracker repairedDataTracker)
{
// Avoid wrapping no-op listener as it doesn't throw, unless we're tracking repaired status
// in which case we need to inject the tracker & verify on close
if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP)
{
if (repairedDataTracker == null)
return partitionListener;
return new UnfilteredPartitionIterators.MergeListener()
{
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
{
return UnfilteredRowIterators.MergeListener.NOOP;
}
public void close()
{
repairedDataTracker.verify();
}
};
}
return new UnfilteredPartitionIterators.MergeListener()
{
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
{
UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions);
return new UnfilteredRowIterators.MergeListener()
{
public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
{
try
{
rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions);
}
catch (AssertionError e)
{
// The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
// rather get more info to debug than not.
TableMetadata table = command.metadata();
String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
table,
mergedDeletion == null ? "null" : mergedDeletion.toString(),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
sources.contacts(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
}
public Row onMergedRows(Row merged, Row[] versions)
{
try
{
return rowListener.onMergedRows(merged, versions);
}
catch (AssertionError e)
{
// The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
// rather get more info to debug than not.
TableMetadata table = command.metadata();
String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
table,
merged == null ? "null" : merged.toString(table),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
sources.contacts(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
}
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
{
try
{
// The code for merging range tombstones is a tad complex and we had the assertions there triggered
// unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
// when that happen without more context that what the assertion errors give us however, hence the
// catch here that basically gather as much as context as reasonable.
rowListener.onMergedRangeTombstoneMarkers(merged, versions);
}
catch (AssertionError e)
{
// The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
// rather get more info to debug than not.
TableMetadata table = command.metadata();
String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
table,
merged == null ? "null" : merged.toString(table),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
sources.contacts(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
}
public void close()
{
rowListener.close();
}
};
}
public void close()
{
partitionListener.close();
if (repairedDataTracker != null)
repairedDataTracker.verify();
}
};
}
}