| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.service; |
| |
| import java.net.InetAddress; |
| import java.util.*; |
| import java.util.concurrent.TimeoutException; |
| import java.util.function.UnaryOperator; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Joiner; |
| import com.google.common.collect.Iterables; |
| |
| import org.apache.cassandra.concurrent.Stage; |
| import org.apache.cassandra.concurrent.StageManager; |
| import org.apache.cassandra.config.*; |
| import org.apache.cassandra.cql3.statements.IndexTarget; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.filter.*; |
| import org.apache.cassandra.db.filter.DataLimits.Counter; |
| import org.apache.cassandra.db.partitions.*; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.db.transform.*; |
| import org.apache.cassandra.dht.AbstractBounds; |
| import org.apache.cassandra.dht.ExcludingBounds; |
| import org.apache.cassandra.dht.Range; |
| import org.apache.cassandra.exceptions.ReadTimeoutException; |
| import org.apache.cassandra.index.sasi.SASIIndex; |
| import org.apache.cassandra.net.*; |
| import org.apache.cassandra.schema.IndexMetadata; |
| import org.apache.cassandra.tracing.Tracing; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| public class DataResolver extends ResponseResolver |
| { |
| private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS = |
| Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); |
| |
| @VisibleForTesting |
| final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); |
| private final long queryStartNanoTime; |
| private final boolean enforceStrictLiveness; |
| |
| DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime) |
| { |
| super(keyspace, command, consistency, maxResponseCount); |
| this.queryStartNanoTime = queryStartNanoTime; |
| this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); |
| } |
| |
| public PartitionIterator getData() |
| { |
| ReadResponse response = responses.iterator().next().payload; |
| return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec()); |
| } |
| |
| public boolean isDataPresent() |
| { |
| return !responses.isEmpty(); |
| } |
| |
| public void compareResponses() |
| { |
| // We need to fully consume the results to trigger read repairs if appropriate |
| try (PartitionIterator iterator = resolve()) |
| { |
| PartitionIterators.consume(iterator); |
| } |
| } |
| |
| public PartitionIterator resolve() |
| { |
| if (!needsReplicaFilteringProtection()) |
| { |
| ResolveContext context = new ResolveContext(responses.size()); |
| return resolveWithReadRepair(context, |
| i -> shortReadProtectedResponse(i, context), |
| UnaryOperator.identity()); |
| } |
| |
| return resolveWithReplicaFilteringProtection(); |
| } |
| |
| private boolean needsReplicaFilteringProtection() |
| { |
| if (command.rowFilter().isEmpty()) |
| return false; |
| |
| IndexMetadata indexDef = command.indexMetadata(); |
| if (indexDef != null && indexDef.isCustom()) |
| { |
| String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME); |
| return !SASIIndex.class.getName().equals(className); |
| } |
| |
| return true; |
| } |
| |
| private class ResolveContext |
| { |
| private final InetAddress[] sources; |
| private final DataLimits.Counter mergedResultCounter; |
| |
| private ResolveContext(int count) |
| { |
| assert count <= responses.size(); |
| this.sources = new InetAddress[count]; |
| for (int i = 0; i < count; i++) |
| sources[i] = responses.get(i).from; |
| this.mergedResultCounter = command.limits().newCounter(command.nowInSec(), |
| true, |
| command.selectsFullPartition(), |
| enforceStrictLiveness); |
| } |
| |
| private boolean needShortReadProtection() |
| { |
| // If we have only one result, there is no read repair to do and we can't get short reads |
| // Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a |
| // partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit, |
| // don't bother protecting against short reads. |
| return sources.length > 1 && !command.limits().isUnlimited(); |
| } |
| } |
| |
| @FunctionalInterface |
| private interface ResponseProvider |
| { |
| UnfilteredPartitionIterator getResponse(int i); |
| } |
| |
| private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context) |
| { |
| UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command); |
| |
| return context.needShortReadProtection() |
| ? extendWithShortReadProtection(originalResponse, context.sources[i], context.mergedResultCounter) |
| : originalResponse; |
| } |
| |
| private PartitionIterator resolveWithReadRepair(ResolveContext context, |
| ResponseProvider responseProvider, |
| UnaryOperator<PartitionIterator> preCountFilter) |
| { |
| return resolveInternal(context, new RepairMergeListener(context.sources), responseProvider, preCountFilter); |
| } |
| |
| private PartitionIterator resolveWithReplicaFilteringProtection() |
| { |
| // Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that |
| // wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version |
| // of that row) works in 3 steps: |
| // 1) we read the full response just to collect rows that may be outdated (the ones we got from some |
| // replica but didn't got any response for other; it could be those other replica have filtered a more |
| // up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the |
| // query limit. This simulate the worst case scenario where all those "potentially outdated" rows are |
| // indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read |
| // protection). |
| // 2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated |
| // or not. |
| // 3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair, |
| // but where for each replica we use their original response _plus_ the additional rows queried in the |
| // previous step (and apply the command#rowFilter() on the full result). Since the first phase has |
| // pessimistically collected enough results for the case where all potentially outdated results are indeed |
| // outdated, we shouldn't need further short-read protection requests during this phase. |
| |
| // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here |
| // at the beginning of this method), so grab the response count once and use that through the method. |
| int count = responses.size(); |
| // We need separate contexts, as each context has his own counter |
| ResolveContext firstPhaseContext = new ResolveContext(count); |
| ResolveContext secondPhaseContext = new ResolveContext(count); |
| ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, command, consistency, queryStartNanoTime, firstPhaseContext.sources); |
| PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext, |
| rfp.mergeController(), |
| i -> shortReadProtectedResponse(i, firstPhaseContext), |
| UnaryOperator.identity()); |
| |
| // Consume the first phase partitions to populate the replica filtering protection with both those materialized |
| // partitions and the primary keys to be fetched. |
| PartitionIterators.consume(firstPhasePartitions); |
| firstPhasePartitions.close(); |
| |
| // After reading the entire query results the protection helper should have cached all the partitions so we can |
| // clear the responses accumulator for the sake of memory usage, given that the second phase might take long if |
| // it needs to query replicas. |
| responses.clearUnsafe(); |
| |
| return resolveWithReadRepair(secondPhaseContext, |
| rfp::queryProtectedPartitions, |
| results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec())); |
| } |
| |
| private PartitionIterator resolveInternal(ResolveContext context, |
| UnfilteredPartitionIterators.MergeListener mergeListener, |
| ResponseProvider responseProvider, |
| UnaryOperator<PartitionIterator> preCountFilter) |
| { |
| int count = context.sources.length; |
| List<UnfilteredPartitionIterator> results = new ArrayList<>(count); |
| for (int i = 0; i < count; i++) |
| results.add(responseProvider.getResponse(i)); |
| |
| /* |
| * Even though every response, individually, will honor the limit, it is possible that we will, after the merge, |
| * have more rows than the client requested. To make sure that we still conform to the original limit, |
| * we apply a top-level post-reconciliation counter to the merged partition iterator. |
| * |
| * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied |
| * to the current partition to work. For this reason we have to apply the counter transformation before |
| * empty partition discard logic kicks in - for it will eagerly consume the iterator. |
| * |
| * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions |
| * |
| * See CASSANDRA-13747 for more details. |
| */ |
| |
| UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, command.nowInSec(), mergeListener); |
| FilteredPartitions filtered = |
| FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); |
| PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter); |
| |
| return command.isForThrift() |
| ? counted |
| : Transformation.apply(counted, new EmptyPartitionsDiscarder()); |
| } |
| |
| private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener |
| { |
| private final InetAddress[] sources; |
| |
| private RepairMergeListener(InetAddress[] sources) |
| { |
| this.sources = sources; |
| } |
| |
| public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) |
| { |
| return new MergeListener(partitionKey, columns(versions), isReversed(versions)); |
| } |
| |
| private PartitionColumns columns(List<UnfilteredRowIterator> versions) |
| { |
| Columns statics = Columns.NONE; |
| Columns regulars = Columns.NONE; |
| for (UnfilteredRowIterator iter : versions) |
| { |
| if (iter == null) |
| continue; |
| |
| PartitionColumns cols = iter.columns(); |
| statics = statics.mergeTo(cols.statics); |
| regulars = regulars.mergeTo(cols.regulars); |
| } |
| return new PartitionColumns(statics, regulars); |
| } |
| |
| private boolean isReversed(List<UnfilteredRowIterator> versions) |
| { |
| for (UnfilteredRowIterator iter : versions) |
| { |
| if (iter == null) |
| continue; |
| |
| // Everything will be in the same order |
| return iter.isReverseOrder(); |
| } |
| |
| assert false : "Expected at least one iterator"; |
| return false; |
| } |
| |
| public void close() |
| { |
| try |
| { |
| FBUtilities.waitOnFutures(repairResults, DatabaseDescriptor.getWriteRpcTimeout()); |
| } |
| catch (TimeoutException ex) |
| { |
| // We got all responses, but timed out while repairing |
| int blockFor = consistency.blockFor(keyspace); |
| if (Tracing.isTracing()) |
| Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); |
| else |
| logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor); |
| |
| throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); |
| } |
| } |
| |
| private class MergeListener implements UnfilteredRowIterators.MergeListener |
| { |
| private final DecoratedKey partitionKey; |
| private final PartitionColumns columns; |
| private final boolean isReversed; |
| private final PartitionUpdate[] repairs = new PartitionUpdate[sources.length]; |
| |
| private final Row.Builder[] currentRows = new Row.Builder[sources.length]; |
| private final RowDiffListener diffListener; |
| |
| // The partition level deletion for the merge row. |
| private DeletionTime partitionLevelDeletion; |
| // When merged has a currently open marker, its time. null otherwise. |
| private DeletionTime mergedDeletionTime; |
| // For each source, the time of the current deletion as known by the source. |
| private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length]; |
| // For each source, record if there is an open range to send as repair, and from where. |
| private final ClusteringBound[] markerToRepair = new ClusteringBound[sources.length]; |
| |
| private MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed) |
| { |
| this.partitionKey = partitionKey; |
| this.columns = columns; |
| this.isReversed = isReversed; |
| |
| this.diffListener = new RowDiffListener() |
| { |
| public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) |
| { |
| if (merged != null && !merged.equals(original)) |
| currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged); |
| } |
| |
| public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) |
| { |
| if (merged != null && !merged.equals(original)) |
| currentRow(i, clustering).addRowDeletion(merged); |
| } |
| |
| public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) |
| { |
| if (merged != null && !merged.equals(original)) |
| currentRow(i, clustering).addComplexDeletion(column, merged); |
| } |
| |
| public void onCell(int i, Clustering clustering, Cell merged, Cell original) |
| { |
| if (merged != null && !merged.equals(original) && isQueried(merged)) |
| currentRow(i, clustering).addCell(merged); |
| } |
| |
| private boolean isQueried(Cell cell) |
| { |
| // When we read, we may have some cell that have been fetched but are not selected by the user. Those cells may |
| // have empty values as optimization (see CASSANDRA-10655) and hence they should not be included in the read-repair. |
| // This is fine since those columns are not actually requested by the user and are only present for the sake of CQL |
| // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has |
| /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are |
| // not repaired. |
| ColumnDefinition column = cell.column(); |
| ColumnFilter filter = command.columnFilter(); |
| return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column); |
| } |
| }; |
| } |
| |
| private PartitionUpdate update(int i) |
| { |
| if (repairs[i] == null) |
| repairs[i] = new PartitionUpdate(command.metadata(), partitionKey, columns, 1); |
| return repairs[i]; |
| } |
| |
| /** |
| * The partition level deletion with with which source {@code i} is currently repaired, or |
| * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was |
| * up to date on it). The output* of this method is only valid after the call to |
| * {@link #onMergedPartitionLevelDeletion}. |
| */ |
| private DeletionTime partitionLevelRepairDeletion(int i) |
| { |
| return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion(); |
| } |
| |
| private Row.Builder currentRow(int i, Clustering clustering) |
| { |
| if (currentRows[i] == null) |
| { |
| currentRows[i] = BTreeRow.sortedBuilder(); |
| currentRows[i].newRow(clustering); |
| } |
| return currentRows[i]; |
| } |
| |
| public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) |
| { |
| this.partitionLevelDeletion = mergedDeletion; |
| for (int i = 0; i < versions.length; i++) |
| { |
| if (mergedDeletion.supersedes(versions[i])) |
| update(i).addPartitionDeletion(mergedDeletion); |
| } |
| } |
| |
| public Row onMergedRows(Row merged, Row[] versions) |
| { |
| // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle |
| // those case directly in their respective methods (in other words, it would be inefficient to send a row |
| // deletion as repair when we know we've already send a partition level or range tombstone that covers it). |
| if (merged.isEmpty()) |
| return merged; |
| |
| Rows.diff(diffListener, merged, versions); |
| for (int i = 0; i < currentRows.length; i++) |
| { |
| if (currentRows[i] != null) |
| update(i).add(currentRows[i].build()); |
| } |
| Arrays.fill(currentRows, null); |
| |
| return merged; |
| } |
| |
| private DeletionTime currentDeletion() |
| { |
| return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime; |
| } |
| |
| public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) |
| { |
| try |
| { |
| // The code for merging range tombstones is a tad complex and we had the assertions there triggered |
| // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights |
| // when that happen without more context that what the assertion errors give us however, hence the |
| // catch here that basically gather as much as context as reasonable. |
| internalOnMergedRangeTombstoneMarkers(merged, versions); |
| } |
| catch (AssertionError e) |
| { |
| // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd |
| // rather get more info to debug than not. |
| CFMetaData table = command.metadata(); |
| String details = String.format("Error merging RTs on %s.%s: command=%s, reversed=%b, merged=%s, versions=%s, sources={%s}, responses:%n %s", |
| table.ksName, table.cfName, |
| command.toCQLString(), |
| isReversed, |
| merged == null ? "null" : merged.toString(table), |
| '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', |
| Arrays.toString(sources), |
| makeResponsesDebugString()); |
| throw new AssertionError(details, e); |
| } |
| } |
| |
| private String makeResponsesDebugString() |
| { |
| return Joiner.on(",\n") |
| .join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey))); |
| } |
| |
| private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) |
| { |
| // The current deletion as of dealing with this marker. |
| DeletionTime currentDeletion = currentDeletion(); |
| |
| for (int i = 0; i < versions.length; i++) |
| { |
| RangeTombstoneMarker marker = versions[i]; |
| |
| // Update what the source now thinks is the current deletion |
| if (marker != null) |
| sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null; |
| |
| // If merged == null, some of the source is opening or closing a marker |
| if (merged == null) |
| { |
| // but if it's not this source, move to the next one |
| if (marker == null) |
| continue; |
| |
| // We have a close and/or open marker for a source, with nothing corresponding in merged. |
| // Because merged is a superset, this imply that we have a current deletion (being it due to an |
| // early opening in merged or a partition level deletion) and that this deletion will still be |
| // active after that point. Further whatever deletion was open or is open by this marker on the |
| // source, that deletion cannot supersedes the current one. |
| // |
| // But while the marker deletion (before and/or after this point) cannot supersede the current |
| // deletion, we want to know if it's equal to it (both before and after), because in that case |
| // the source is up to date and we don't want to include repair. |
| // |
| // So in practice we have 2 possible case: |
| // 1) the source was up-to-date on deletion up to that point: then it won't be from that point |
| // on unless it's a boundary and the new opened deletion time is also equal to the current |
| // deletion (note that this implies the boundary has the same closing and opening deletion |
| // time, which should generally not happen, but can due to legacy reading code not avoiding |
| // this for a while, see CASSANDRA-13237). |
| // 2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't |
| // we just have nothing to do for that marker). |
| assert !currentDeletion.isLive() : currentDeletion.toString(); |
| |
| // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair |
| // nor an "active" partition level deletion (where "active" means that it's greater or equal |
| // to the current deletion: if the source has a repaired partition deletion lower than the |
| // current deletion, this means the current deletion is due to a previously open range tombstone, |
| // and if the source isn't currently repaired for that RT, then it means it's up to date on it). |
| DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i); |
| if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion)) |
| { |
| /* |
| * Since there is an ongoing merged deletion, the only way we don't have an open repair for |
| * this source is that it had a range open with the same deletion as current marker, |
| * and the marker is closing it. |
| */ |
| assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)) |
| : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); |
| |
| // and so unless it's a boundary whose opening deletion time is still equal to the current |
| // deletion (see comment above for why this can actually happen), we have to repair the source |
| // from that point on. |
| if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))) |
| markerToRepair[i] = marker.closeBound(isReversed).invert(); |
| } |
| // In case 2) above, we only have something to do if the source is up-to-date after that point |
| // (which, since the source isn't up-to-date before that point, means we're opening a new deletion |
| // that is equal to the current one). |
| else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))) |
| { |
| closeOpenMarker(i, marker.openBound(isReversed).invert()); |
| } |
| } |
| else |
| { |
| // We have a change of current deletion in merged (potentially to/from no deletion at all). |
| |
| if (merged.isClose(isReversed)) |
| { |
| // We're closing the merged range. If we're recorded that this should be repaird for the |
| // source, close and add said range to the repair to send. |
| if (markerToRepair[i] != null) |
| closeOpenMarker(i, merged.closeBound(isReversed)); |
| |
| } |
| |
| if (merged.isOpen(isReversed)) |
| { |
| // If we're opening a new merged range (or just switching deletion), then unless the source |
| // is up to date on that deletion (note that we've updated what the source deleteion is |
| // above), we'll have to sent the range to the source. |
| DeletionTime newDeletion = merged.openDeletionTime(isReversed); |
| DeletionTime sourceDeletion = sourceDeletionTime[i]; |
| if (!newDeletion.equals(sourceDeletion)) |
| markerToRepair[i] = merged.openBound(isReversed); |
| } |
| } |
| } |
| |
| if (merged != null) |
| mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null; |
| } |
| |
| private void closeOpenMarker(int i, ClusteringBound close) |
| { |
| ClusteringBound open = markerToRepair[i]; |
| update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion())); |
| markerToRepair[i] = null; |
| } |
| |
| public void close() |
| { |
| for (int i = 0; i < repairs.length; i++) |
| if (null != repairs[i]) |
| sendRepairMutation(repairs[i], sources[i]); |
| } |
| |
| private void sendRepairMutation(PartitionUpdate partition, InetAddress destination) |
| { |
| Mutation mutation = new Mutation(partition); |
| int messagingVersion = MessagingService.instance().getVersion(destination); |
| |
| int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion); |
| int maxMutationSize = DatabaseDescriptor.getMaxMutationSize(); |
| |
| if (mutationSize <= maxMutationSize) |
| { |
| Tracing.trace("Sending read-repair-mutation to {}", destination); |
| // use a separate verb here to avoid writing hints on timeouts |
| MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR); |
| repairResults.add(MessagingService.instance().sendRR(message, destination)); |
| ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark(); |
| } |
| else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS) |
| { |
| logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}", |
| mutationSize, |
| maxMutationSize, |
| command.metadata().ksName, |
| command.metadata().cfName, |
| command.metadata().getKeyValidator().getString(partitionKey.getKey()), |
| destination); |
| } |
| else |
| { |
| logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}", |
| mutationSize, |
| maxMutationSize, |
| command.metadata().ksName, |
| command.metadata().cfName, |
| command.metadata().getKeyValidator().getString(partitionKey.getKey()), |
| destination); |
| |
| int blockFor = consistency.blockFor(keyspace); |
| Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); |
| throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); |
| } |
| } |
| } |
| } |
| |
| private UnfilteredPartitionIterator extendWithShortReadProtection(UnfilteredPartitionIterator partitions, |
| InetAddress source, |
| DataLimits.Counter mergedResultCounter) |
| { |
| DataLimits.Counter singleResultCounter = |
| command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount(); |
| |
| ShortReadPartitionsProtection protection = |
| new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter, queryStartNanoTime); |
| |
| /* |
| * The order of extention and transformations is important here. Extending with more partitions has to happen |
| * first due to the way BaseIterator.hasMoreContents() works: only transformations applied after extension will |
| * be called on the first partition of the extended iterator. |
| * |
| * Additionally, we want singleResultCounter to be applied after SRPP, so that its applyToPartition() method will |
| * be called last, after the extension done by SRRP.applyToPartition() call. That way we preserve the same order |
| * when it comes to calling SRRP.moreContents() and applyToRow() callbacks. |
| * |
| * See ShortReadPartitionsProtection.applyToPartition() for more details. |
| */ |
| |
| // extend with moreContents() only if it's a range read command with no partition key specified |
| if (!command.isLimitedToOnePartition()) |
| partitions = MorePartitions.extend(partitions, protection); // register SRPP.moreContents() |
| |
| partitions = Transformation.apply(partitions, protection); // register SRPP.applyToPartition() |
| partitions = Transformation.apply(partitions, singleResultCounter); // register the per-source counter |
| |
| return partitions; |
| } |
| |
| /* |
| * We have a potential short read if the result from a given node contains the requested number of rows |
| * (i.e. it has stopped returning results due to the limit), but some of them haven't |
| * made it into the final post-reconciliation result due to other nodes' row, range, and/or partition tombstones. |
| * |
| * If that is the case, then that node may have more rows that we should fetch, as otherwise we could |
| * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which |
| * which we also need to fetch as they may shadow rows or partitions from other replicas' results, which we would |
| * otherwise return incorrectly. |
| */ |
| private class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator> |
| { |
| private final InetAddress source; |
| |
| private final DataLimits.Counter singleResultCounter; // unmerged per-source counter |
| private final DataLimits.Counter mergedResultCounter; // merged end-result counter |
| |
| private DecoratedKey lastPartitionKey; // key of the last observed partition |
| |
| private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call |
| |
| private final long queryStartNanoTime; |
| |
| private ShortReadPartitionsProtection(InetAddress source, |
| DataLimits.Counter singleResultCounter, |
| DataLimits.Counter mergedResultCounter, |
| long queryStartNanoTime) |
| { |
| this.source = source; |
| this.singleResultCounter = singleResultCounter; |
| this.mergedResultCounter = mergedResultCounter; |
| this.queryStartNanoTime = queryStartNanoTime; |
| } |
| |
| @Override |
| public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) |
| { |
| partitionsFetched = true; |
| |
| lastPartitionKey = partition.partitionKey(); |
| |
| /* |
| * Extend for moreContents() then apply protection to track lastClustering by applyToRow(). |
| * |
| * If we don't apply the transformation *after* extending the partition with MoreRows, |
| * applyToRow() method of protection will not be called on the first row of the new extension iterator. |
| */ |
| ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.metadata(), partition.partitionKey()); |
| return Transformation.apply(MoreRows.extend(partition, protection), protection); |
| } |
| |
| /* |
| * We only get here once all the rows and partitions in this iterator have been iterated over, and so |
| * if the node had returned the requested number of rows but we still get here, then some results were |
| * skipped during reconciliation. |
| */ |
| public UnfilteredPartitionIterator moreContents() |
| { |
| // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit |
| assert !mergedResultCounter.isDone(); |
| |
| // we do not apply short read protection when we have no limits at all |
| assert !command.limits().isUnlimited(); |
| |
| /* |
| * If this is a single partition read command or an (indexed) partition range read command with |
| * a partition key specified, then we can't and shouldn't try fetch more partitions. |
| */ |
| assert !command.isLimitedToOnePartition(); |
| |
| /* |
| * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more. |
| * |
| * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false |
| * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). |
| */ |
| if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) |
| return null; |
| |
| /* |
| * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator. |
| * There is no point to ask the replica for more rows - it has no more in the requested range. |
| */ |
| if (!partitionsFetched) |
| return null; |
| partitionsFetched = false; |
| |
| /* |
| * We are going to fetch one partition at a time for thrift and potentially more for CQL. |
| * The row limit will either be set to the per partition limit - if the command has no total row limit set, or |
| * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions, |
| * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones. |
| */ |
| int toQuery = command.limits().count() != DataLimits.NO_LIMIT |
| ? command.limits().count() - counted(mergedResultCounter) |
| : command.limits().perPartitionCount(); |
| |
| ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark(); |
| Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); |
| |
| PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery); |
| return executeReadCommand(cmd); |
| } |
| |
| // Counts the number of rows for regular queries and the number of groups for GROUP BY queries |
| private int counted(Counter counter) |
| { |
| return command.limits().isGroupByLimit() |
| ? counter.rowCounted() |
| : counter.counted(); |
| } |
| |
| private PartitionRangeReadCommand makeFetchAdditionalPartitionReadCommand(int toQuery) |
| { |
| PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command; |
| |
| DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery); |
| |
| AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange(); |
| AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight() |
| ? new Range<>(lastPartitionKey, bounds.right) |
| : new ExcludingBounds<>(lastPartitionKey, bounds.right); |
| DataRange newDataRange = cmd.dataRange().forSubRange(newBounds); |
| |
| return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange); |
| } |
| |
| private class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator> |
| { |
| private final CFMetaData metadata; |
| private final DecoratedKey partitionKey; |
| |
| private Clustering lastClustering; // clustering of the last observed row |
| |
| private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows |
| private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command) |
| private int lastQueried = 0; // # extra rows requested from the replica last time |
| |
| private ShortReadRowsProtection(CFMetaData metadata, DecoratedKey partitionKey) |
| { |
| this.metadata = metadata; |
| this.partitionKey = partitionKey; |
| } |
| |
| @Override |
| public Row applyToRow(Row row) |
| { |
| lastClustering = row.clustering(); |
| return row; |
| } |
| |
| /* |
| * We only get here once all the rows in this iterator have been iterated over, and so if the node |
| * had returned the requested number of rows but we still get here, then some results were skipped |
| * during reconciliation. |
| */ |
| public UnfilteredRowIterator moreContents() |
| { |
| // never try to request additional rows from replicas if our reconciled partition is already filled to the limit |
| assert !mergedResultCounter.isDoneForPartition(); |
| |
| // we do not apply short read protection when we have no limits at all |
| assert !command.limits().isUnlimited(); |
| |
| /* |
| * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more. |
| * |
| * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false |
| * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). |
| */ |
| if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) |
| return null; |
| |
| /* |
| * If the replica has no live rows in the partition, don't try to fetch more. |
| * |
| * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't |
| * always cover this scenario: |
| * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit], |
| * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition. |
| * |
| * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch |
| * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only |
| * have tombstones in the current partition. |
| * |
| * One other way we can hit this condition is when the partition only has a live static row and no regular |
| * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after |
| * the moreContents() call. |
| */ |
| if (countedInCurrentPartition(singleResultCounter) == 0) |
| return null; |
| |
| /* |
| * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering. |
| * We already have the row, so there is no point in asking for more from the partition. |
| */ |
| if (Clustering.EMPTY == lastClustering) |
| return null; |
| |
| lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted; |
| lastCounted = countedInCurrentPartition(singleResultCounter); |
| |
| // getting back fewer rows than we asked for means the partition on the replica has been fully consumed |
| if (lastQueried > 0 && lastFetched < lastQueried) |
| return null; |
| |
| /* |
| * At this point we know that: |
| * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more |
| * rows in the partition |
| * 2. at least one of those returned rows was shadowed by a tombstone returned from another |
| * replica |
| * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to |
| * avoid a short read |
| * |
| * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b |
| * are defined as follows: |
| * [a] limits.count() - mergedResultCounter.counted() |
| * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition() |
| * |
| * It would be naive to query for exactly that many rows, as it's possible and not unlikely |
| * that some of the returned rows would also be shadowed by tombstones from other hosts. |
| * |
| * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result; |
| * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it. |
| * |
| * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number |
| * of rows fetched: there is a high transactional cost for every individual request, but a relatively low |
| * marginal cost for each extra row requested. |
| * |
| * As such it's better to overfetch than to underfetch extra rows from a host; but at the same |
| * time we want to respect paging limits and not blow up spectacularly. |
| * |
| * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only |
| * counts. |
| * |
| * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits. |
| * |
| * See CASSANDRA-13794 for more details. |
| */ |
| lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount()); |
| |
| ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark(); |
| Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source); |
| |
| SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried); |
| return UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd); |
| } |
| |
| // Counts the number of rows for regular queries and the number of groups for GROUP BY queries |
| private int countedInCurrentPartition(Counter counter) |
| { |
| return command.limits().isGroupByLimit() |
| ? counter.rowCountedInCurrentPartition() |
| : counter.countedInCurrentPartition(); |
| } |
| |
| private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery) |
| { |
| ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); |
| if (null != lastClustering) |
| filter = filter.forPaging(metadata.comparator, lastClustering, false); |
| |
| return SinglePartitionReadCommand.create(command.isForThrift(), |
| command.metadata(), |
| command.nowInSec(), |
| command.columnFilter(), |
| command.rowFilter(), |
| command.limits().forShortReadRetry(toQuery), |
| partitionKey, |
| filter, |
| command.indexMetadata()); |
| } |
| } |
| |
| private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd) |
| { |
| DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1, queryStartNanoTime); |
| ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source), queryStartNanoTime); |
| |
| if (StorageProxy.canDoLocalRequest(source)) |
| StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); |
| else |
| MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler); |
| |
| // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. |
| handler.awaitResults(); |
| assert resolver.responses.size() == 1; |
| return resolver.responses.get(0).payload.makeIterator(command); |
| } |
| } |
| } |