blob: 3452a352593cc9a07f12bea1125e13586fe587a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads.range;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.assertj.core.util.VisibleForTesting;
public class RangeCommands
{
private static final Logger logger = LoggerFactory.getLogger(RangeCommandIterator.class);
private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
/**
* Introduce a maximum number of sub-ranges that the coordinator can request in parallel for range queries. Previously
* we would request up to the maximum number of ranges but this causes problems if the number of vnodes is large.
* By default we pick 10 requests per core, assuming all replicas have the same number of cores. The idea is that we
* don't want a burst of range requests that will back up, hurting all other queries. At the same time,
* we want to give range queries a chance to run if resources are available.
*/
private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests",
FBUtilities.getAvailableProcessors() * 10));
@SuppressWarnings("resource") // created iterators will be closed in CQL layer through the chain of transformations
public static PartitionIterator partitions(PartitionRangeReadCommand command,
ConsistencyLevel consistencyLevel,
long queryStartNanoTime)
{
// Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
RangeCommandIterator rangeCommands = rangeCommandIterator(command, consistencyLevel, queryStartNanoTime);
return command.limits().filter(command.postReconciliationProcessing(rangeCommands),
command.nowInSec(),
command.selectsFullPartition(),
command.metadata().enforceStrictLiveness());
}
@VisibleForTesting
@SuppressWarnings("resource") // created iterators will be closed in CQL layer through the chain of transformations
static RangeCommandIterator rangeCommandIterator(PartitionRangeReadCommand command,
ConsistencyLevel consistencyLevel,
long queryStartNanoTime)
{
Tracing.trace("Computing ranges to query");
Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
ReplicaPlanIterator replicaPlans = new ReplicaPlanIterator(command.dataRange().keyRange(), keyspace, consistencyLevel);
// our estimate of how many result rows there will be per-range
float resultsPerRange = estimateResultsPerRange(command, keyspace);
// underestimate how many rows we will get per-range in order to increase the likelihood that we'll
// fetch enough rows in the first round
resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
int maxConcurrencyFactor = Math.min(replicaPlans.size(), MAX_CONCURRENT_RANGE_REQUESTS);
int concurrencyFactor = resultsPerRange == 0.0
? 1
: Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange)));
logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
resultsPerRange, command.limits().count(), replicaPlans.size(), concurrencyFactor);
Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)",
replicaPlans.size(), concurrencyFactor, resultsPerRange);
ReplicaPlanMerger mergedReplicaPlans = new ReplicaPlanMerger(replicaPlans, keyspace, consistencyLevel);
return new RangeCommandIterator(mergedReplicaPlans,
command,
concurrencyFactor,
maxConcurrencyFactor,
replicaPlans.size(),
queryStartNanoTime);
}
/**
* Estimate the number of result rows per range in the ring based on our local data.
* <p>
* This assumes that ranges are uniformly distributed across the cluster and
* that the queried data is also uniformly distributed.
*/
@VisibleForTesting
static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
{
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
Index index = command.getIndex(cfs);
float maxExpectedResults = index == null
? command.limits().estimateTotalResults(cfs)
: index.getEstimatedResultRows();
// adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks
return (maxExpectedResults / DatabaseDescriptor.getNumTokens())
/ keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
}
}