blob: 428c603a12842d444fb89d39ad96252c326f20b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Columns;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionColumns;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.btree.BTreeSet;
/**
* Helper in charge of collecting additional queries to be done on the coordinator to protect against invalid results
* being included due to replica-side filtering (secondary indexes or {@code ALLOW * FILTERING}).
* <p>
* When using replica-side filtering with CL>ONE, a replica can send a stale result satisfying the filter, while updated
* replicas won't send a corresponding tombstone to discard that result during reconciliation. This helper identifies
* the rows in a replica response that don't have a corresponding row in other replica responses, and requests them by
* primary key to the "silent" replicas in a second fetch round.
* <p>
* See CASSANDRA-8272 and CASSANDRA-8273 for further details.
*/
class ReplicaFilteringProtection
{
private static final Logger logger = LoggerFactory.getLogger(ReplicaFilteringProtection.class);
private final Keyspace keyspace;
private final ReadCommand command;
private final ConsistencyLevel consistency;
private final long queryStartNanoTime;
private final InetAddress[] sources;
private final TableMetrics tableMetrics;
/**
* Per-source primary keys of the rows that might be outdated so they need to be fetched.
* For outdated static rows we use an empty builder to signal it has to be queried.
*/
private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> rowsToFetch;
/**
* Per-source list of all the partitions seen by the merge listener, to be merged with the extra fetched rows.
*/
private final List<List<PartitionBuilder>> originalPartitions;
ReplicaFilteringProtection(Keyspace keyspace,
ReadCommand command,
ConsistencyLevel consistency,
long queryStartNanoTime,
InetAddress[] sources)
{
this.keyspace = keyspace;
this.command = command;
this.consistency = consistency;
this.queryStartNanoTime = queryStartNanoTime;
this.sources = sources;
this.rowsToFetch = new ArrayList<>(sources.length);
this.originalPartitions = new ArrayList<>(sources.length);
for (InetAddress ignored : sources)
{
rowsToFetch.add(new TreeMap<>());
originalPartitions.add(new ArrayList<>());
}
tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId);
}
private BTreeSet.Builder<Clustering> getOrCreateToFetch(int source, DecoratedKey partitionKey)
{
return rowsToFetch.get(source).computeIfAbsent(partitionKey, k -> BTreeSet.builder(command.metadata().comparator));
}
/**
* Returns the protected results for the specified replica. These are generated fetching the extra rows and merging
* them with the cached original filtered results for that replica.
*
* @param source the source
* @return the protected results for the specified replica
*/
UnfilteredPartitionIterator queryProtectedPartitions(int source)
{
UnfilteredPartitionIterator original = makeIterator(originalPartitions.get(source));
SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = rowsToFetch.get(source);
if (toFetch.isEmpty())
return original;
// TODO: this would be more efficient if we had multi-key queries internally
List<UnfilteredPartitionIterator> fetched = toFetch.keySet()
.stream()
.map(k -> querySourceOnKey(source, k))
.collect(Collectors.toList());
return UnfilteredPartitionIterators.merge(Arrays.asList(original, UnfilteredPartitionIterators.concat(fetched)),
command.nowInSec(), null);
}
private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey key)
{
BTreeSet.Builder<Clustering> builder = rowsToFetch.get(i).get(key);
assert builder != null; // We're calling this on the result of rowsToFetch.get(i).keySet()
InetAddress source = sources[i];
NavigableSet<Clustering> clusterings = builder.build();
tableMetrics.replicaSideFilteringProtectionRequests.mark();
if (logger.isTraceEnabled())
logger.trace("Requesting rows {} in partition {} from {} for replica-side filtering protection",
clusterings, key, source);
Tracing.trace("Requesting {} rows in partition {} from {} for replica-side filtering protection",
clusterings.size(), key, source);
// build the read command taking into account that we could be requesting only in the static row
DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE;
ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed());
SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
command.nowInSec(),
command.columnFilter(),
RowFilter.NONE,
limits,
key,
filter);
try
{
return executeReadCommand(cmd, source);
}
catch (ReadTimeoutException e)
{
int blockFor = consistency.blockFor(keyspace);
throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
}
catch (UnavailableException e)
{
int blockFor = consistency.blockFor(keyspace);
throw new UnavailableException(consistency, blockFor, blockFor - 1);
}
}
private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, InetAddress source)
{
DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1, queryStartNanoTime);
ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source), queryStartNanoTime);
if (StorageProxy.canDoLocalRequest(source))
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
else
MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler);
// We don't call handler.get() because we want to preserve tombstones
handler.awaitResults();
assert resolver.responses.size() == 1;
return resolver.responses.get(0).payload.makeIterator(command);
}
/**
* Returns a merge listener that skips the merged rows for which any of the replicas doesn't have a version,
* pessimistically assuming that they are outdated. It is intended to be used during a first merge of per-replica
* query results to ensure we fetch enough results from the replicas to ensure we don't miss any potentially
* outdated result.
* <p>
* The listener will track both the accepted data and the primary keys of the rows that are considered as outdated.
* That way, once the query results would have been merged using this listener, further calls to
* {@link #queryProtectedPartitions(int)} will use the collected data to return a copy of the
* data originally collected from the specified replica, completed with the potentially outdated rows.
*/
UnfilteredPartitionIterators.MergeListener mergeController()
{
return (partitionKey, versions) -> {
PartitionBuilder[] builders = new PartitionBuilder[sources.length];
for (int i = 0; i < sources.length; i++)
builders[i] = new PartitionBuilder(partitionKey, columns(versions), stats(versions));
return new UnfilteredRowIterators.MergeListener()
{
@Override
public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
{
// cache the deletion time versions to be able to regenerate the original row iterator
for (int i = 0; i < versions.length; i++)
builders[i].setDeletionTime(versions[i]);
}
@Override
public Row onMergedRows(Row merged, Row[] versions)
{
// cache the row versions to be able to regenerate the original row iterator
for (int i = 0; i < versions.length; i++)
builders[i].addRow(versions[i]);
if (merged.isEmpty())
return merged;
boolean isPotentiallyOutdated = false;
boolean isStatic = merged.isStatic();
for (int i = 0; i < versions.length; i++)
{
Row version = versions[i];
if (version == null || (isStatic && version.isEmpty()))
{
isPotentiallyOutdated = true;
BTreeSet.Builder<Clustering> toFetch = getOrCreateToFetch(i, partitionKey);
// Note that for static, we shouldn't add the clustering to the clustering set (the
// ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact
// we created a builder in the first place will act as a marker that the static row must be
// fetched, even if no other rows are added for this partition.
if (!isStatic)
toFetch.add(merged.clustering());
}
}
// If the row is potentially outdated (because some replica didn't send anything and so it _may_ be
// an outdated result that is only present because other replica have filtered the up-to-date result
// out), then we skip the row. In other words, the results of the initial merging of results by this
// protection assume the worst case scenario where every row that might be outdated actually is.
// This ensures that during this first phase (collecting additional row to fetch) we are guaranteed
// to look at enough data to ultimately fulfill the query limit.
return isPotentiallyOutdated ? null : merged;
}
@Override
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
{
// cache the marker versions to be able to regenerate the original row iterator
for (int i = 0; i < versions.length; i++)
builders[i].addRangeTombstoneMarker(versions[i]);
}
@Override
public void close()
{
for (int i = 0; i < sources.length; i++)
originalPartitions.get(i).add(builders[i]);
}
};
};
}
private static PartitionColumns columns(List<UnfilteredRowIterator> versions)
{
Columns statics = Columns.NONE;
Columns regulars = Columns.NONE;
for (UnfilteredRowIterator iter : versions)
{
if (iter == null)
continue;
PartitionColumns cols = iter.columns();
statics = statics.mergeTo(cols.statics);
regulars = regulars.mergeTo(cols.regulars);
}
return new PartitionColumns(statics, regulars);
}
private static EncodingStats stats(List<UnfilteredRowIterator> iterators)
{
EncodingStats stats = EncodingStats.NO_STATS;
for (UnfilteredRowIterator iter : iterators)
{
if (iter == null)
continue;
stats = stats.mergeWith(iter.stats());
}
return stats;
}
private UnfilteredPartitionIterator makeIterator(List<PartitionBuilder> builders)
{
return new UnfilteredPartitionIterator()
{
final Iterator<PartitionBuilder> iterator = builders.iterator();
@Override
public boolean isForThrift()
{
return command.isForThrift();
}
@Override
public CFMetaData metadata()
{
return command.metadata();
}
@Override
public void close()
{
// nothing to do here
}
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public UnfilteredRowIterator next()
{
return iterator.next().build();
}
};
}
private class PartitionBuilder
{
private final DecoratedKey partitionKey;
private final PartitionColumns columns;
private final EncodingStats stats;
private DeletionTime deletionTime;
private Row staticRow = Rows.EMPTY_STATIC_ROW;
private final List<Unfiltered> contents = new ArrayList<>();
private PartitionBuilder(DecoratedKey partitionKey, PartitionColumns columns, EncodingStats stats)
{
this.partitionKey = partitionKey;
this.columns = columns;
this.stats = stats;
}
private void setDeletionTime(DeletionTime deletionTime)
{
this.deletionTime = deletionTime;
}
private void addRow(Row row)
{
if (row == null)
return;
if (row.isStatic())
staticRow = row;
else
contents.add(row);
}
private void addRangeTombstoneMarker(RangeTombstoneMarker marker)
{
if (marker != null)
contents.add(marker);
}
private UnfilteredRowIterator build()
{
return new UnfilteredRowIterator()
{
final Iterator<Unfiltered> iterator = contents.iterator();
@Override
public DeletionTime partitionLevelDeletion()
{
return deletionTime;
}
@Override
public EncodingStats stats()
{
return stats;
}
@Override
public CFMetaData metadata()
{
return command.metadata();
}
@Override
public boolean isReverseOrder()
{
return command.isReversed();
}
@Override
public PartitionColumns columns()
{
return columns;
}
@Override
public DecoratedKey partitionKey()
{
return partitionKey;
}
@Override
public Row staticRow()
{
return staticRow;
}
@Override
public void close()
{
// nothing to do here
}
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public Unfiltered next()
{
return iterator.next();
}
};
}
}
}