blob: c39862adffa20373e9ca40b1f89da9676b73ab83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.locator;
import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForRead;
import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForWrite;
import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor;
import static org.apache.cassandra.db.ConsistencyLevel.localQuorumForOurDc;
import static org.apache.cassandra.locator.Replicas.addToCountPerDc;
import static org.apache.cassandra.locator.Replicas.countInOurDc;
import static org.apache.cassandra.locator.Replicas.countPerDc;
public class ReplicaPlans
{
private static final Logger logger = LoggerFactory.getLogger(ReplicaPlans.class);
public static boolean isSufficientLiveReplicasForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas)
{
switch (consistencyLevel)
{
case ANY:
// local hint is acceptable, and local node is always live
return true;
case LOCAL_ONE:
return countInOurDc(liveReplicas).hasAtleast(1, 1);
case LOCAL_QUORUM:
return countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(replicationStrategy), 1);
case EACH_QUORUM:
if (replicationStrategy instanceof NetworkTopologyStrategy)
{
int fullCount = 0;
Collection<String> dcs = ((NetworkTopologyStrategy) replicationStrategy).getDatacenters();
for (ObjectObjectCursor<String, Replicas.ReplicaCount> entry : countPerDc(dcs, liveReplicas))
{
Replicas.ReplicaCount count = entry.value;
if (!count.hasAtleast(localQuorumFor(replicationStrategy, entry.key), 0))
return false;
fullCount += count.fullReplicas();
}
return fullCount > 0;
}
// Fallthough on purpose for SimpleStrategy
default:
return liveReplicas.size() >= consistencyLevel.blockFor(replicationStrategy)
&& Replicas.countFull(liveReplicas) > 0;
}
}
static void assureSufficientLiveReplicasForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws UnavailableException
{
assureSufficientLiveReplicas(replicationStrategy, consistencyLevel, liveReplicas, consistencyLevel.blockFor(replicationStrategy), 1);
}
static void assureSufficientLiveReplicasForWrite(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
{
assureSufficientLiveReplicas(replicationStrategy, consistencyLevel, allLive, consistencyLevel.blockForWrite(replicationStrategy, pendingWithDown), 0);
}
static void assureSufficientLiveReplicas(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
{
switch (consistencyLevel)
{
case ANY:
// local hint is acceptable, and local node is always live
break;
case LOCAL_ONE:
{
Replicas.ReplicaCount localLive = countInOurDc(allLive);
if (!localLive.hasAtleast(blockFor, blockForFullReplicas))
throw UnavailableException.create(consistencyLevel, 1, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas());
break;
}
case LOCAL_QUORUM:
{
Replicas.ReplicaCount localLive = countInOurDc(allLive);
if (!localLive.hasAtleast(blockFor, blockForFullReplicas))
{
if (logger.isTraceEnabled())
{
logger.trace(String.format("Local replicas %s are insufficient to satisfy LOCAL_QUORUM requirement of %d live replicas and %d full replicas in '%s'",
allLive.filter(InOurDc.replicas()), blockFor, blockForFullReplicas, DatabaseDescriptor.getLocalDataCenter()));
}
throw UnavailableException.create(consistencyLevel, blockFor, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas());
}
break;
}
case EACH_QUORUM:
if (replicationStrategy instanceof NetworkTopologyStrategy)
{
int total = 0;
int totalFull = 0;
Collection<String> dcs = ((NetworkTopologyStrategy) replicationStrategy).getDatacenters();
for (ObjectObjectCursor<String, Replicas.ReplicaCount> entry : countPerDc(dcs, allLive))
{
int dcBlockFor = localQuorumFor(replicationStrategy, entry.key);
Replicas.ReplicaCount dcCount = entry.value;
if (!dcCount.hasAtleast(dcBlockFor, 0))
throw UnavailableException.create(consistencyLevel, entry.key, dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas());
totalFull += dcCount.fullReplicas();
total += dcCount.allReplicas();
}
if (totalFull < blockForFullReplicas)
throw UnavailableException.create(consistencyLevel, blockFor, total, blockForFullReplicas, totalFull);
break;
}
// Fallthough on purpose for SimpleStrategy
default:
int live = allLive.size();
int full = Replicas.countFull(allLive);
if (live < blockFor || full < blockForFullReplicas)
{
if (logger.isTraceEnabled())
logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(allLive), blockFor);
throw UnavailableException.create(consistencyLevel, blockFor, blockForFullReplicas, live, full);
}
break;
}
}
/**
* Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive.
*/
public static ReplicaPlan.ForWrite forSingleReplicaWrite(Keyspace keyspace, Token token, Replica replica)
{
EndpointsForToken one = EndpointsForToken.of(token, replica);
EndpointsForToken empty = EndpointsForToken.empty(token);
return new ReplicaPlan.ForWrite(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, empty, one, one, one);
}
/**
* A forwarding counter write is always sent to a single owning coordinator for the range, by the original coordinator
* (if it is not itself an owner)
*/
public static ReplicaPlan.ForWrite forForwardingCounterWrite(Keyspace keyspace, Token token, Replica replica)
{
return forSingleReplicaWrite(keyspace, token, replica);
}
public static ReplicaPlan.ForWrite forLocalBatchlogWrite()
{
Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
systemKeypsace.getReplicationStrategy(),
EndpointsForToken.of(token, localSystemReplica),
EndpointsForToken.empty(token)
);
return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll);
}
/**
* Requires that the provided endpoints are alive. Converts them to their relevant system replicas.
* Note that the liveAndDown collection and live are equal to the provided endpoints.
*
* @param isAny if batch consistency level is ANY, in which case a local node will be picked
*/
public static ReplicaPlan.ForWrite forBatchlogWrite(boolean isAny) throws UnavailableException
{
// A single case we write not for range or token, but multiple mutations to many tokens
Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks()
.get(snitch.getLocalDatacenter()));
// Replicas are picked manually:
// - replicas should be alive according to the failure detector
// - replicas should be in the local datacenter
// - choose min(2, number of qualifying candiates above)
// - allow the local node to be the only replica only if it's a single-node DC
Collection<InetAddressAndPort> chosenEndpoints = filterBatchlogEndpoints(snitch.getLocalRack(), localEndpoints);
if (chosenEndpoints.isEmpty() && isAny)
chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
systemKeypsace.getReplicationStrategy(),
SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
EndpointsForToken.empty(token)
);
// Batchlog is hosted by either one node or two nodes from different racks.
ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
// assume that we have already been given live endpoints, and skip applying the failure detector
return forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
}
private static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
Multimap<String, InetAddressAndPort> endpoints)
{
return filterBatchlogEndpoints(localRack,
endpoints,
Collections::shuffle,
FailureDetector.isEndpointAlive,
ThreadLocalRandom.current()::nextInt);
}
// Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
@VisibleForTesting
public static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
Multimap<String, InetAddressAndPort> endpoints,
Consumer<List<?>> shuffle,
Predicate<InetAddressAndPort> isAlive,
Function<Integer, Integer> indexPicker)
{
// special case for single-node data centers
if (endpoints.values().size() == 1)
return endpoints.values();
// strip out dead endpoints and localhost
ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create();
for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries())
{
InetAddressAndPort addr = entry.getValue();
if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr))
validated.put(entry.getKey(), entry.getValue());
}
if (validated.size() <= 2)
return validated.values();
if (validated.size() - validated.get(localRack).size() >= 2)
{
// we have enough endpoints in other racks
validated.removeAll(localRack);
}
if (validated.keySet().size() == 1)
{
/*
* we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack)
* pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack
* because of the preceding if block.
*/
List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values());
shuffle.accept(otherRack);
return otherRack.subList(0, 2);
}
// randomize which racks we pick from if more than 2 remaining
Collection<String> racks;
if (validated.keySet().size() == 2)
{
racks = validated.keySet();
}
else
{
racks = Lists.newArrayList(validated.keySet());
shuffle.accept((List<?>) racks);
}
// grab a random member of up to two racks
List<InetAddressAndPort> result = new ArrayList<>(2);
for (String rack : Iterables.limit(racks, 2))
{
List<InetAddressAndPort> rackMembers = validated.get(rack);
result.add(rackMembers.get(indexPicker.apply(rackMembers.size())));
}
return result;
}
public static ReplicaPlan.ForWrite forReadRepair(Token token, ReplicaPlan<?, ?> readPlan) throws UnavailableException
{
return forWrite(readPlan.keyspace(), readPlan.consistencyLevel(), token, writeReadRepair(readPlan));
}
public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
{
return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
}
@VisibleForTesting
public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken pending, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
{
return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWrite(keyspace.getReplicationStrategy(), natural, pending), isAlive, selector);
}
public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Selector selector) throws UnavailableException
{
return forWrite(keyspace, consistencyLevel, liveAndDown, FailureDetector.isReplicaAlive, selector);
}
private static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
{
ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive);
return forWrite(keyspace, consistencyLevel, liveAndDown, live, selector);
}
public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
{
assert liveAndDown.replicationStrategy() == live.replicationStrategy()
: "ReplicaLayout liveAndDown and live should be derived from the same replication strategy.";
AbstractReplicationStrategy replicationStrategy = liveAndDown.replicationStrategy();
EndpointsForToken contacts = selector.select(consistencyLevel, liveAndDown, live);
assureSufficientLiveReplicasForWrite(replicationStrategy, consistencyLevel, live.all(), liveAndDown.pending());
return new ReplicaPlan.ForWrite(keyspace, replicationStrategy, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
}
public interface Selector
{
/**
* Select the {@code Endpoints} from {@param liveAndDown} and {@param live} to contact according to the consistency level.
*/
<E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live);
}
/**
* Select all nodes, transient or otherwise, as targets for the operation.
*
* This is may no longer be useful once we finish implementing transient replication support, however
* it can be of value to stipulate that a location writes to all nodes without regard to transient status.
*/
public static final Selector writeAll = new Selector()
{
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
{
return liveAndDown.all();
}
};
/**
* Select all full nodes, live or down, as write targets. If there are insufficient nodes to complete the write,
* but there are live transient nodes, select a sufficient number of these to reach our consistency level.
*
* Pending nodes are always contacted, whether or not they are full. When a transient replica is undergoing
* a pending move to a new node, if we write (transiently) to it, this write would not be replicated to the
* pending transient node, and so when completing the move, the write could effectively have not reached the
* promised consistency level.
*/
public static final Selector writeNormal = new Selector()
{
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
{
if (!any(liveAndDown.all(), Replica::isTransient))
return liveAndDown.all();
ReplicaCollection.Builder<E> contacts = liveAndDown.all().newBuilder(liveAndDown.all().size());
contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
contacts.addAll(liveAndDown.pending());
/**
* Per CASSANDRA-14768, we ensure we write to at least a QUORUM of nodes in every DC,
* regardless of how many responses we need to wait for and our requested consistencyLevel.
* This is to minimally surprise users with transient replication; with normal writes, we
* soft-ensure that we reach QUORUM in all DCs we are able to, by writing to every node;
* even if we don't wait for ACK, we have in both cases sent sufficient messages.
*/
ObjectIntHashMap<String> requiredPerDc = eachQuorumForWrite(liveAndDown.replicationStrategy(), liveAndDown.pending());
addToCountPerDc(requiredPerDc, live.natural().filter(Replica::isFull), -1);
addToCountPerDc(requiredPerDc, live.pending(), -1);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
for (Replica replica : filter(live.natural(), Replica::isTransient))
{
String dc = snitch.getDatacenter(replica);
if (requiredPerDc.addTo(dc, -1) >= 0)
contacts.add(replica);
}
return contacts.build();
}
};
/**
* TODO: Transient Replication C-14404/C-14665
* TODO: We employ this even when there is no monotonicity to guarantee,
* e.g. in case of CL.TWO, CL.ONE with speculation, etc.
*
* Construct a read-repair write plan to provide monotonicity guarantees on any data we return as part of a read.
*
* Since this is not a regular write, this is just to guarantee future reads will read this data, we select only
* the minimal number of nodes to meet the consistency level, and prefer nodes we contacted on read to minimise
* data transfer.
*/
public static Selector writeReadRepair(ReplicaPlan<?, ?> readPlan)
{
return new Selector()
{
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
{
assert !any(liveAndDown.all(), Replica::isTransient);
ReplicaCollection.Builder<E> contacts = live.all().newBuilder(live.all().size());
// add all live nodes we might write to that we have already contacted on read
contacts.addAll(filter(live.all(), r -> readPlan.contacts().endpoints().contains(r.endpoint())));
// finally, add sufficient nodes to achieve our consistency level
if (consistencyLevel != EACH_QUORUM)
{
int add = consistencyLevel.blockForWrite(liveAndDown.replicationStrategy(), liveAndDown.pending()) - contacts.size();
if (add > 0)
{
for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
{
contacts.add(replica);
if (--add == 0)
break;
}
}
}
else
{
ObjectIntHashMap<String> requiredPerDc = eachQuorumForWrite(liveAndDown.replicationStrategy(), liveAndDown.pending());
addToCountPerDc(requiredPerDc, contacts.snapshot(), -1);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
{
String dc = snitch.getDatacenter(replica);
if (requiredPerDc.addTo(dc, -1) >= 0)
contacts.add(replica);
}
}
return contacts.build();
}
};
}
/**
* Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison,
* but for the paxos linearisation agreement.
*
* This will select all live nodes as the candidates for the operation. Only the required number of participants
*/
public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
{
Token tk = key.getToken();
ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
Replicas.temporaryAssertFull(liveAndDown.all()); // TODO CASSANDRA-14547
if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
{
// TODO: we should cleanup our semantics here, as we're filtering ALL nodes to localDC which is unexpected for ReplicaPlan
// Restrict natural and pending to node in the local DC only
liveAndDown = liveAndDown.filter(InOurDc.replicas());
}
ReplicaLayout.ForTokenWrite live = liveAndDown.filter(FailureDetector.isReplicaAlive);
// TODO: this should use assureSufficientReplicas
int participants = liveAndDown.all().size();
int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
EndpointsForToken contacts = live.all();
if (contacts.size() < requiredParticipants)
throw UnavailableException.create(consistencyForPaxos, requiredParticipants, contacts.size());
// We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
// Note that we fake an impossible number of required nodes in the unavailable exception
// to nail home the point that it's an impossible operation no matter how many nodes are live.
if (liveAndDown.pending().size() > 1)
throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", liveAndDown.all().size()),
consistencyForPaxos,
participants + 1,
contacts.size());
return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, requiredParticipants);
}
private static <E extends Endpoints<E>> E candidatesForRead(ConsistencyLevel consistencyLevel, E liveNaturalReplicas)
{
return consistencyLevel.isDatacenterLocal()
? liveNaturalReplicas.filter(InOurDc.replicas())
: liveNaturalReplicas;
}
private static <E extends Endpoints<E>> E contactForEachQuorumRead(NetworkTopologyStrategy replicationStrategy, E candidates)
{
ObjectIntHashMap<String> perDc = eachQuorumForRead(replicationStrategy);
final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
return candidates.filter(replica -> {
String dc = snitch.getDatacenter(replica);
return perDc.addTo(dc, -1) >= 0;
});
}
private static <E extends Endpoints<E>> E contactForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
{
/*
* If we are doing an each quorum query, we have to make sure that the endpoints we select
* provide a quorum for each data center. If we are not using a NetworkTopologyStrategy,
* we should fall through and grab a quorum in the replication strategy.
*
* We do not speculate for EACH_QUORUM.
*
* TODO: this is still very inconistently managed between {LOCAL,EACH}_QUORUM and other consistency levels - should address this in a follow-up
*/
if (consistencyLevel == EACH_QUORUM && replicationStrategy instanceof NetworkTopologyStrategy)
return contactForEachQuorumRead((NetworkTopologyStrategy) replicationStrategy, candidates);
int count = consistencyLevel.blockFor(replicationStrategy) + (alwaysSpeculate ? 1 : 0);
return candidates.subList(0, Math.min(count, candidates.size()));
}
/**
* Construct a plan for reading from a single node - this permits no speculation or read-repair
*/
public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace keyspace, Token token, Replica replica)
{
EndpointsForToken one = EndpointsForToken.of(token, replica);
return new ReplicaPlan.ForTokenRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one);
}
/**
* Construct a plan for reading from a single node - this permits no speculation or read-repair
*/
public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica, int vnodeCount)
{
// TODO: this is unsafe, as one.range() may be inconsistent with our supplied range; should refactor Range/AbstractBounds to single class
EndpointsForRange one = EndpointsForRange.of(replica);
return new ReplicaPlan.ForRangeRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one, vnodeCount);
}
/**
* Construct a plan for reading the provided token at the provided consistency level. This translates to a collection of
* - candidates who are: alive, replicate the token, and are sorted by their snitch scores
* - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) candidates
*
* The candidate collection can be used for speculation, although at present
* it would break EACH_QUORUM to do so without further filtering
*/
public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
{
AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy();
EndpointsForToken candidates = candidatesForRead(consistencyLevel, ReplicaLayout.forTokenReadLiveSorted(replicationStrategy, token).natural());
EndpointsForToken contacts = contactForRead(replicationStrategy, consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), candidates);
assureSufficientLiveReplicasForRead(replicationStrategy, consistencyLevel, contacts);
return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates, contacts);
}
/**
* Construct a plan for reading the provided range at the provided consistency level. This translates to a collection of
* - candidates who are: alive, replicate the range, and are sorted by their snitch scores
* - contacts who are: the first blockFor candidates
*
* There is no speculation for range read queries at present, so we never 'always speculate' here, and a failed response fails the query.
*/
public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, int vnodeCount)
{
AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy();
EndpointsForRange candidates = candidatesForRead(consistencyLevel, ReplicaLayout.forRangeReadLiveSorted(replicationStrategy, range).natural());
EndpointsForRange contacts = contactForRead(replicationStrategy, consistencyLevel, false, candidates);
assureSufficientLiveReplicasForRead(replicationStrategy, consistencyLevel, contacts);
return new ReplicaPlan.ForRangeRead(keyspace, replicationStrategy, consistencyLevel, range, candidates, contacts, vnodeCount);
}
/**
* Take two range read plans for adjacent ranges, and check if it is OK (and worthwhile) to combine them into a single plan
*/
public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaPlan.ForRangeRead left, ReplicaPlan.ForRangeRead right)
{
// TODO: should we be asserting that the ranges are adjacent?
AbstractBounds<PartitionPosition> newRange = left.range().withNewRight(right.range().right);
EndpointsForRange mergedCandidates = left.readCandidates().keep(right.readCandidates().endpoints());
AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy();
// Check if there are enough shared endpoints for the merge to be possible.
if (!isSufficientLiveReplicasForRead(replicationStrategy, consistencyLevel, mergedCandidates))
return null;
EndpointsForRange contacts = contactForRead(replicationStrategy, consistencyLevel, false, mergedCandidates);
// Estimate whether merging will be a win or not
if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, left.contacts(), right.contacts()))
return null;
// If we get there, merge this range and the next one
return new ReplicaPlan.ForRangeRead(keyspace, replicationStrategy, consistencyLevel, newRange, mergedCandidates, contacts, left.vnodeCount() + right.vnodeCount());
}
}