blob: f49c9fb3a8bebe026fde6e8e590e3e98b7cb9242 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.transform.MorePartitions;
import org.apache.cassandra.db.transform.MoreRows;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.ExcludingBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.tracing.Tracing;
public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator>
{
private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class);
private final ReadCommand command;
private final Replica source;
private final Runnable preFetchCallback; // called immediately before fetching more contents
private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
private final DataLimits.Counter mergedResultCounter; // merged end-result counter
private DecoratedKey lastPartitionKey; // key of the last observed partition
private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call
private final long queryStartNanoTime;
public ShortReadPartitionsProtection(ReadCommand command,
Replica source,
Runnable preFetchCallback,
DataLimits.Counter singleResultCounter,
DataLimits.Counter mergedResultCounter,
long queryStartNanoTime)
{
this.command = command;
this.source = source;
this.preFetchCallback = preFetchCallback;
this.singleResultCounter = singleResultCounter;
this.mergedResultCounter = mergedResultCounter;
this.queryStartNanoTime = queryStartNanoTime;
}
@Override
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
partitionsFetched = true;
lastPartitionKey = partition.partitionKey();
/*
* Extend for moreContents() then apply protection to track lastClustering by applyToRow().
*
* If we don't apply the transformation *after* extending the partition with MoreRows,
* applyToRow() method of protection will not be called on the first row of the new extension iterator.
*/
ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
command, source,
(cmd) -> executeReadCommand(cmd, sharedReplicaPlan),
singleResultCounter,
mergedResultCounter);
return Transformation.apply(MoreRows.extend(partition, protection), protection);
}
/*
* We only get here once all the rows and partitions in this iterator have been iterated over, and so
* if the node had returned the requested number of rows but we still get here, then some results were
* skipped during reconciliation.
*/
public UnfilteredPartitionIterator moreContents()
{
// never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit
assert !mergedResultCounter.isDone();
// we do not apply short read protection when we have no limits at all
assert !command.limits().isUnlimited();
/*
* If this is a single partition read command or an (indexed) partition range read command with
* a partition key specified, then we can't and shouldn't try fetch more partitions.
*/
assert !command.isLimitedToOnePartition();
/*
* If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more.
*
* Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
* positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
*/
if (command.limits().isExhausted(singleResultCounter) && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
return null;
/*
* Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator.
* There is no point to ask the replica for more rows - it has no more in the requested range.
*/
if (!partitionsFetched)
return null;
partitionsFetched = false;
/*
* We are going to fetch one partition at a time for thrift and potentially more for CQL.
* The row limit will either be set to the per partition limit - if the command has no total row limit set, or
* the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions,
* then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones.
*/
int toQuery = command.limits().count() != DataLimits.NO_LIMIT
? command.limits().count() - mergedResultCounter.rowsCounted()
: command.limits().perPartitionCount();
ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
logger.info("Requesting {} extra rows from {} for short read protection", toQuery, source);
// If we've arrived here, all responses have been consumed, and we're about to request more.
preFetchCallback.run();
return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery);
}
private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery)
{
PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
? new Range<>(lastPartitionKey, bounds.right)
: new ExcludingBounds<>(lastPartitionKey, bounds.right);
DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source, 1);
return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan));
}
private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>>
UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan)
{
DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
if (source.isSelf())
{
Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
}
else
{
if (source.isTransient())
cmd = cmd.copyAsTransientQuery(source);
MessagingService.instance().sendWithCallback(cmd.createMessage(false), source.endpoint(), handler);
}
// We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
handler.awaitResults();
assert resolver.getMessages().size() == 1;
return resolver.getMessages().get(0).payload.makeIterator(command);
}
}